Adding the SQL aggregate function update for count, min, max, and sum. The code if functional for scalar and aggregate implementations. Still working on avg and all serializable implementations.
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index 299bbfb..8f0bab3 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -304,6 +304,57 @@
     public final static FunctionIdentifier SERIAL_LOCAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "local-avg-serial", 1);
 
+    // sql aggregate functions
+    public final static FunctionIdentifier SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-sql-avg", 1);
+    public final static FunctionIdentifier SQL_COUNT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-sql-count", 1);
+    public final static FunctionIdentifier SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-sql-sum", 1);
+    public final static FunctionIdentifier LOCAL_SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-local-sql-sum", 1);
+    public final static FunctionIdentifier SQL_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-sql-max", 1);
+    public final static FunctionIdentifier LOCAL_SQL_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-local-sql-max", 1);
+    public final static FunctionIdentifier SQL_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-sql-min", 1);
+    public final static FunctionIdentifier LOCAL_SQL_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-local-sql-min", 1);
+    public final static FunctionIdentifier GLOBAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-global-sql-avg", 1);
+    public final static FunctionIdentifier LOCAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "agg-local-sql-avg", 1);
+
+    public final static FunctionIdentifier SCALAR_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-avg", 1);
+    public final static FunctionIdentifier SCALAR_SQL_COUNT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-count", 1);
+    public final static FunctionIdentifier SCALAR_SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-sum", 1);
+    public final static FunctionIdentifier SCALAR_SQL_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-max", 1);
+    public final static FunctionIdentifier SCALAR_SQL_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-min", 1);
+    public final static FunctionIdentifier SCALAR_GLOBAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "global-sql-avg", 1);
+    public final static FunctionIdentifier SCALAR_LOCAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "local-sql-avg", 1);
+
+    // serializable sql aggregate functions
+    public final static FunctionIdentifier SERIAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-avg-serial", 1);
+    public final static FunctionIdentifier SERIAL_SQL_COUNT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-count-serial", 1);
+    public final static FunctionIdentifier SERIAL_SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sql-sum-serial", 1);
+    public final static FunctionIdentifier SERIAL_LOCAL_SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "local-sql-sum-serial", 1);
+    public final static FunctionIdentifier SERIAL_GLOBAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "global-sql-avg-serial", 1);
+    public final static FunctionIdentifier SERIAL_LOCAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "local-sql-avg-serial", 1);
+
     public final static FunctionIdentifier SCAN_COLLECTION = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "scan-collection", 1);
     public final static FunctionIdentifier SUBSET_COLLECTION = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -739,6 +790,29 @@
         addFunction(RANGE, AInt32TypeComputer.INSTANCE, true);
         addFunction(RECTANGLE_CONSTRUCTOR, OptionalARectangleTypeComputer.INSTANCE, true);
 
+        // SQL Aggregate Functions
+        addFunction(SQL_AVG, OptionalADoubleTypeComputer.INSTANCE, true);
+        addFunction(SQL_COUNT, AInt64TypeComputer.INSTANCE, true);
+        addFunction(SQL_MAX, NonTaggedMinMaxAggTypeComputer.INSTANCE, true);
+        addPrivateFunction(LOCAL_SQL_MAX, NonTaggedMinMaxAggTypeComputer.INSTANCE, true);
+        addFunction(SQL_MIN, NonTaggedMinMaxAggTypeComputer.INSTANCE, true);
+        addPrivateFunction(LOCAL_SQL_MIN, NonTaggedMinMaxAggTypeComputer.INSTANCE, true);
+        addFunction(SQL_SUM, NonTaggedNumericAggTypeComputer.INSTANCE, true);
+        addPrivateFunction(LOCAL_SQL_SUM, NonTaggedNumericAggTypeComputer.INSTANCE, true);
+        addFunction(SCALAR_SQL_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addFunction(SCALAR_SQL_COUNT, AInt64TypeComputer.INSTANCE, true);
+        addPrivateFunction(SCALAR_GLOBAL_SQL_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addPrivateFunction(SCALAR_LOCAL_SQL_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addFunction(SCALAR_SQL_MAX, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addFunction(SCALAR_SQL_MIN, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addFunction(SCALAR_SQL_SUM, ScalarVersionOfAggregateResultType.INSTANCE, true);
+        addPrivateFunction(SERIAL_SQL_AVG, OptionalADoubleTypeComputer.INSTANCE, true);
+        addPrivateFunction(SERIAL_SQL_COUNT, AInt64TypeComputer.INSTANCE, true);
+        addPrivateFunction(SERIAL_GLOBAL_SQL_AVG, OptionalADoubleTypeComputer.INSTANCE, true);
+        addPrivateFunction(SERIAL_LOCAL_SQL_AVG, NonTaggedLocalAvgTypeComputer.INSTANCE, true);
+        addPrivateFunction(SERIAL_SQL_SUM, NonTaggedNumericAggTypeComputer.INSTANCE, true);
+        addPrivateFunction(SERIAL_LOCAL_SQL_SUM, NonTaggedNumericAggTypeComputer.INSTANCE, true);
+
         addFunction(SCALAR_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
         addFunction(SCALAR_COUNT, AInt64TypeComputer.INSTANCE, true);
         addPrivateFunction(SCALAR_GLOBAL_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
@@ -913,6 +987,16 @@
         scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_MAX), getAsterixFunctionInfo(MAX));
         scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_MIN), getAsterixFunctionInfo(MIN));
         scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SUM), getAsterixFunctionInfo(SUM));
+        // SQL Aggregate Functions
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_AVG), getAsterixFunctionInfo(SQL_AVG));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_COUNT), getAsterixFunctionInfo(SQL_COUNT));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_GLOBAL_SQL_AVG),
+                getAsterixFunctionInfo(GLOBAL_SQL_AVG));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_LOCAL_SQL_AVG),
+                getAsterixFunctionInfo(LOCAL_SQL_AVG));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_MAX), getAsterixFunctionInfo(SQL_MAX));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_MIN), getAsterixFunctionInfo(SQL_MIN));
+        scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_SUM), getAsterixFunctionInfo(SQL_SUM));
     }
 
     static {
@@ -964,6 +1048,55 @@
         addAgg(SERIAL_LOCAL_SUM);
         addLocalAgg(SERIAL_SUM, SERIAL_LOCAL_SUM);
         addGlobalAgg(SERIAL_SUM, SERIAL_SUM);
+
+        // SQL Aggregate Functions
+        addAgg(SQL_AVG);
+        addAgg(LOCAL_SQL_AVG);
+        addAgg(GLOBAL_SQL_AVG);
+        addLocalAgg(SQL_AVG, LOCAL_SQL_AVG);
+        addGlobalAgg(SQL_AVG, GLOBAL_SQL_AVG);
+
+        addAgg(SQL_COUNT);
+        addLocalAgg(SQL_COUNT, SQL_COUNT);
+        addGlobalAgg(SQL_COUNT, SQL_SUM);
+
+        addAgg(SQL_MAX);
+        addAgg(LOCAL_SQL_MAX);
+        addLocalAgg(SQL_MAX, LOCAL_SQL_MAX);
+        addGlobalAgg(SQL_MAX, SQL_MAX);
+
+        addAgg(SQL_MIN);
+        addLocalAgg(SQL_MIN, LOCAL_SQL_MIN);
+        addGlobalAgg(SQL_MIN, SQL_MIN);
+
+        addAgg(SQL_SUM);
+        addAgg(LOCAL_SQL_SUM);
+        addLocalAgg(SQL_SUM, LOCAL_SQL_SUM);
+        addGlobalAgg(SQL_SUM, SQL_SUM);
+
+        // SQL serializable aggregate functions
+        addSerialAgg(SQL_AVG, SERIAL_SQL_AVG);
+        addSerialAgg(SQL_COUNT, SERIAL_SQL_COUNT);
+        addSerialAgg(SQL_SUM, SERIAL_SQL_SUM);
+        addSerialAgg(LOCAL_SQL_SUM, SERIAL_LOCAL_SQL_SUM);
+        addSerialAgg(LOCAL_SQL_AVG, SERIAL_LOCAL_SQL_AVG);
+        addSerialAgg(GLOBAL_SQL_AVG, SERIAL_GLOBAL_SQL_AVG);
+
+        addAgg(SERIAL_SQL_COUNT);
+        addLocalAgg(SERIAL_SQL_COUNT, SERIAL_SQL_COUNT);
+        addGlobalAgg(SERIAL_SQL_COUNT, SERIAL_SQL_SUM);
+
+        addAgg(SERIAL_SQL_AVG);
+        addAgg(SERIAL_LOCAL_SQL_AVG);
+        addAgg(SERIAL_GLOBAL_SQL_AVG);
+        addLocalAgg(SERIAL_SQL_AVG, SERIAL_LOCAL_SQL_AVG);
+        addGlobalAgg(SERIAL_SQL_AVG, SERIAL_GLOBAL_SQL_AVG);
+
+        addAgg(SERIAL_SQL_SUM);
+        addAgg(SERIAL_LOCAL_SQL_SUM);
+        addLocalAgg(SERIAL_SQL_SUM, SERIAL_LOCAL_SQL_SUM);
+        addGlobalAgg(SERIAL_SQL_SUM, SERIAL_SQL_SUM);
+
     }
 
     static {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..241ee46
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlAvgAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlAvgAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_AVG;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlCountAggregateDescriptor.java
new file mode 100644
index 0000000..1b581e4
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlCountAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlCountAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_COUNT;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSqlCountAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMaxAggregateDescriptor.java
new file mode 100644
index 0000000..f58d578
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMaxAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlMaxAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_MAX;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSqlMaxAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMinAggregateDescriptor.java
new file mode 100644
index 0000000..da6e38d
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMinAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlMinAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_MIN;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSqlMinAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..14aa6b7
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlSumAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlSumAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_SUM;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSqlSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..d322b98
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,225 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
+import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableGlobalSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableGlobalSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SERIAL_GLOBAL_AVG;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        final ICopyEvaluatorFactory[] evals = args;
+        List<IAType> unionList = new ArrayList<IAType>();
+        unionList.add(BuiltinType.ANULL);
+        unionList.add(BuiltinType.ADOUBLE);
+        ARecordType _recType;
+        try {
+            _recType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
+                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
+        } catch (AsterixException e) {
+            throw new AlgebricksException(e);
+        }
+
+        final ARecordType recType = _recType;
+
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+
+                return new ICopySerializableAggregateFunction() {
+                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+                    private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
+                    private AMutableDouble aDouble = new AMutableDouble(0);
+                    private AMutableInt64 aInt64 = new AMutableInt64(0);
+
+                    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
+                    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
+                    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
+                    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
+                    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
+                    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
+                    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
+                    private ClosedRecordConstructorEval recordEval;
+
+                    @SuppressWarnings("unchecked")
+                    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+                            .getSerializerDeserializer(BuiltinType.AINT64);
+                    @SuppressWarnings("unchecked")
+                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
+                    @SuppressWarnings("unchecked")
+                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+                            .getSerializerDeserializer(BuiltinType.ANULL);
+
+                    @Override
+                    public void init(DataOutput state) throws AlgebricksException {
+                        try {
+                            state.writeDouble(0.0);
+                            state.writeLong(0);
+                            state.writeBoolean(false);
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
+                        }
+                    }
+
+                    @Override
+                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
+                            throws AlgebricksException {
+                        double globalSum = BufferSerDeUtil.getDouble(state, start);
+                        long globalCount = BufferSerDeUtil.getLong(state, start + 8);
+                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
+
+                        inputVal.reset();
+                        eval.evaluate(tuple);
+                        byte[] serBytes = inputVal.getByteArray();
+                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+                        switch (typeTag) {
+                            case NULL: {
+                                metNull = true;
+                                break;
+                            }
+                            case SYSTEM_NULL: {
+                                // Ignore and return.
+                                return;
+                            }
+                            case RECORD: {
+                                // Expected.
+                                break;
+                            }
+                            default: {
+                                throw new AlgebricksException("Global-Avg is not defined for values of type "
+                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+                            }
+                        }
+                        int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, 1, true);
+                        if (offset1 == 0) // the sum is null
+                            metNull = true;
+                        else
+                            globalSum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
+                        int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, 1, true);
+                        if (offset2 != 0) // the count is not null
+                            globalCount += AInt64SerializerDeserializer.getLong(serBytes, offset2);
+
+                        BufferSerDeUtil.writeDouble(globalSum, state, start);
+                        BufferSerDeUtil.writeLong(globalCount, state, start + 8);
+                        BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
+
+                    }
+
+                    @Override
+                    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+                        double globalSum = BufferSerDeUtil.getDouble(state, start);
+                        long globalCount = BufferSerDeUtil.getLong(state, start + 8);
+                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
+
+                        try {
+                            if (globalCount == 0 || metNull)
+                                nullSerde.serialize(ANull.NULL, result);
+                            else {
+                                aDouble.setValue(globalSum / globalCount);
+                                doubleSerde.serialize(aDouble, result);
+                            }
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
+                        }
+                    }
+
+                    @Override
+                    public void finishPartial(byte[] state, int start, int len, DataOutput result)
+                            throws AlgebricksException {
+                        double globalSum = BufferSerDeUtil.getDouble(state, start);
+                        long globalCount = BufferSerDeUtil.getLong(state, start + 8);
+                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
+
+                        if (recordEval == null)
+                            recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
+                                    evalCount }, avgBytes, result);
+
+                        try {
+                            if (globalCount == 0 || metNull) {
+                                sumBytes.reset();
+                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
+                            } else {
+                                sumBytes.reset();
+                                aDouble.setValue(globalSum);
+                                doubleSerde.serialize(aDouble, sumBytesOutput);
+                            }
+                            countBytes.reset();
+                            aInt64.setValue(globalCount);
+                            longSerde.serialize(aInt64, countBytesOutput);
+                            recordEval.evaluate(null);
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
+                        }
+                    }
+                };
+            }
+        };
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..f65b27d
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,238 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
+import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableLocalSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableLocalSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SERIAL_LOCAL_AVG;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        final ICopyEvaluatorFactory[] evals = args;
+        List<IAType> unionList = new ArrayList<IAType>();
+        unionList.add(BuiltinType.ANULL);
+        unionList.add(BuiltinType.ADOUBLE);
+        ARecordType tmpRecType;
+        try {
+            tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
+                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
+        } catch (AsterixException e) {
+            throw new AlgebricksException(e);
+        }
+
+        final ARecordType recType = tmpRecType;
+
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new ICopySerializableAggregateFunction() {
+                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+                    private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
+                    private ClosedRecordConstructorEval recordEval;
+
+                    private AMutableDouble aDouble = new AMutableDouble(0);
+                    private AMutableInt64 aInt64 = new AMutableInt64(0);
+
+                    @SuppressWarnings("unchecked")
+                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
+                    @SuppressWarnings("unchecked")
+                    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+                            .getSerializerDeserializer(BuiltinType.AINT64);
+                    @SuppressWarnings("unchecked")
+                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+                            .getSerializerDeserializer(BuiltinType.ANULL);
+
+                    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
+                    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
+                    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
+                    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
+                    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
+                    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
+                    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
+
+                    @Override
+                    public void init(DataOutput state) throws AlgebricksException {
+                        try {
+                            state.writeDouble(0.0);
+                            state.writeLong(0);
+                            state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
+                        }
+                    }
+
+                    @Override
+                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
+                            throws AlgebricksException {
+                        inputVal.reset();
+                        eval.evaluate(tuple);
+                        double sum = BufferSerDeUtil.getDouble(state, start);
+                        long count = BufferSerDeUtil.getLong(state, start + 8);
+                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                                .deserialize(inputVal.getByteArray()[0]);
+                        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
+                        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+                            aggType = ATypeTag.NULL;
+                            return;
+                        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+                            aggType = typeTag;
+                        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
+                            throw new AlgebricksException("Unexpected type " + typeTag
+                                    + " in aggregation input stream. Expected type " + aggType + ".");
+                        }
+                        ++count;
+                        switch (typeTag) {
+                            case INT8: {
+                                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case INT16: {
+                                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case INT32: {
+                                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case INT64: {
+                                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case FLOAT: {
+                                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case DOUBLE: {
+                                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case NULL: {
+                                break;
+                            }
+                            default: {
+                                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+                            }
+                        }
+                        BufferSerDeUtil.writeDouble(sum, state, start);
+                        BufferSerDeUtil.writeLong(count, state, start + 8);
+                        state[start + 16] = aggType.serialize();
+                    }
+
+                    @Override
+                    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+                        double sum = BufferSerDeUtil.getDouble(state, start);
+                        long count = BufferSerDeUtil.getLong(state, start + 8);
+                        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
+                        if (recordEval == null) {
+                            recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
+                                    evalCount }, avgBytes, result);
+                        }
+                        try {
+                            if (count == 0) {
+                                result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                                return;
+                            }
+                            if (aggType == ATypeTag.NULL) {
+                                sumBytes.reset();
+                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
+                            } else {
+                                sumBytes.reset();
+                                aDouble.setValue(sum);
+                                doubleSerde.serialize(aDouble, sumBytesOutput);
+                            }
+                            countBytes.reset();
+                            aInt64.setValue(count);
+                            int64Serde.serialize(aInt64, countBytesOutput);
+                            recordEval.evaluate(null);
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
+                        }
+                    }
+
+                    @Override
+                    public void finishPartial(byte[] state, int start, int len, DataOutput partialResult)
+                            throws AlgebricksException {
+                        finish(state, start, len, partialResult);
+                    }
+
+                };
+            }
+        };
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..7b2b18d
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = AsterixBuiltinFunctions.SERIAL_LOCAL_SUM;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableLocalSqlSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new SerializableSumAggregateFunction(args, true);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
new file mode 100644
index 0000000..0e6be6c
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * count(NULL) returns NULL.
+ */
+public class SerializableSqlCountAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableSqlCountAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SERIAL_SQL_COUNT;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+
+                return new ICopySerializableAggregateFunction() {
+                    private AMutableInt64 result = new AMutableInt64(-1);
+                    @SuppressWarnings("unchecked")
+                    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+                            .getSerializerDeserializer(BuiltinType.AINT64);
+                    @SuppressWarnings("unchecked")
+                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+                            .getSerializerDeserializer(BuiltinType.ANULL);
+                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
+
+                    @Override
+                    public void init(DataOutput state) throws AlgebricksException {
+                        try {
+                            state.writeBoolean(false);
+                            state.writeLong(0);
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
+                        }
+                    }
+
+                    @Override
+                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
+                            throws AlgebricksException {
+                        boolean metNull = BufferSerDeUtil.getBoolean(state, start);
+                        long cnt = BufferSerDeUtil.getLong(state, start + 1);
+                        inputVal.reset();
+                        eval.evaluate(tuple);
+                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                                .deserialize(inputVal.getByteArray()[0]);
+                        if (typeTag == ATypeTag.NULL) {
+                            metNull = true;
+                        } else {
+                            cnt++;
+                        }
+                        BufferSerDeUtil.writeBoolean(metNull, state, start);
+                        BufferSerDeUtil.writeLong(cnt, state, start + 1);
+                    }
+
+                    @Override
+                    public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+                        boolean metNull = BufferSerDeUtil.getBoolean(state, start);
+                        long cnt = BufferSerDeUtil.getLong(state, start + 1);
+                        try {
+                            if (metNull) {
+                                nullSerde.serialize(ANull.NULL, out);
+                            } else {
+                                result.setValue(cnt);
+                                int64Serde.serialize(result, out);
+                            }
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
+                        }
+                    }
+
+                    @Override
+                    public void finishPartial(byte[] state, int start, int len, DataOutput out)
+                            throws AlgebricksException {
+                        finish(state, start, len, out);
+                    }
+                };
+            }
+        };
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..4c9dd88
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableSqlSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SERIAL_SUM;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new SerializableSumAggregateFunction(args, false);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
new file mode 100644
index 0000000..5c724a4
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
+import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunction {
+    private final ARecordType recType;
+
+    private DataOutput out;
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    private double sum;
+    private long count;
+    private ATypeTag aggType;
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+
+    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
+    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
+    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
+    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
+    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
+    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
+    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
+    private ClosedRecordConstructorEval recordEval;
+
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ADOUBLE);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt64> intSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ANULL);
+
+    public AbstractAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        eval = args[0].createEvaluator(inputVal);
+        out = output.getDataOutput();
+
+        List<IAType> unionList = new ArrayList<IAType>();
+        unionList.add(BuiltinType.ANULL);
+        unionList.add(BuiltinType.ADOUBLE);
+        ARecordType tmpRecType;
+        try {
+            tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
+                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
+        } catch (AsterixException e) {
+            throw new AlgebricksException(e);
+        }
+
+        recType = tmpRecType;
+        recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum, evalCount }, avgBytes,
+                out);
+    }
+
+    @Override
+    public void init() {
+        aggType = ATypeTag.SYSTEM_NULL;
+        sum = 0.0;
+        count = 0;
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+            aggType = ATypeTag.NULL;
+            return;
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            aggType = typeTag;
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+                    + aggType + ".");
+        } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+            aggType = typeTag;
+        }
+
+        if (typeTag != ATypeTag.SYSTEM_NULL) {
+            ++count;
+        }
+
+        switch (typeTag) {
+            case INT8: {
+                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT16: {
+                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT32: {
+                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT64: {
+                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case FLOAT: {
+                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case DOUBLE: {
+                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case NULL: {
+                break;
+            }
+            default: {
+                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+            }
+        }
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            if (count == 0 || aggType == ATypeTag.NULL) {
+                nullSerde.serialize(ANull.NULL, out);
+            } else {
+                aDouble.setValue(sum / count);
+                doubleSerde.serialize(aDouble, out);
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        if (count == 0) {
+            if (GlobalConfig.DEBUG) {
+                GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+            }
+        } else {
+            try {
+                if (aggType == ATypeTag.NULL) {
+                    sumBytes.reset();
+                    nullSerde.serialize(ANull.NULL, sumBytesOutput);
+                } else {
+                    sumBytes.reset();
+                    aDouble.setValue(sum);
+                    doubleSerde.serialize(aDouble, sumBytesOutput);
+                }
+                countBytes.reset();
+                aInt64.setValue(count);
+                intSerde.serialize(aInt64, countBytesOutput);
+                recordEval.evaluate(null);
+            } catch (IOException e) {
+                throw new AlgebricksException(e);
+            }
+        }
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
new file mode 100644
index 0000000..bae42a9
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * COUNT returns the number of items in the given list. Note that COUNT(NULL) is not allowed.
+ */
+public abstract class AbstractCountAggregateFunction implements ICopyAggregateFunction {
+    private AMutableInt64 result = new AMutableInt64(-1);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    private long cnt;
+    private DataOutput out;
+
+    public AbstractCountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        eval = args[0].createEvaluator(inputVal);
+        out = output.getDataOutput();
+    }
+
+    @Override
+    public void init() {
+        cnt = 0;
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        // Ignore SYSTEM_NULL.
+        if (typeTag == ATypeTag.NULL) {
+            processNull();
+        } else if (typeTag != ATypeTag.SYSTEM_NULL) {
+            cnt++;
+        }
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            result.setValue(cnt);
+            int64Serde.serialize(result, out);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    protected void processNull() {
+        cnt++;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractGlobalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractGlobalAvgAggregateFunction.java
new file mode 100644
index 0000000..253be6d
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractGlobalAvgAggregateFunction.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
+import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractGlobalAvgAggregateFunction implements ICopyAggregateFunction {
+    private final ARecordType recType;
+
+    private DataOutput out;
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    private double globalSum;
+    private long globalCount;
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+
+    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
+    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
+    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
+    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
+    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
+    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
+    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
+    private ClosedRecordConstructorEval recordEval;
+
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ADOUBLE);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ANULL);
+    private boolean metNull;
+
+    public AbstractGlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        eval = args[0].createEvaluator(inputVal);
+        out = output.getDataOutput();
+
+        List<IAType> unionList = new ArrayList<IAType>();
+        unionList.add(BuiltinType.ANULL);
+        unionList.add(BuiltinType.ADOUBLE);
+        ARecordType tmpRecType;
+        try {
+            tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
+                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
+        } catch (AsterixException e) {
+            throw new AlgebricksException(e);
+        }
+
+        recType = tmpRecType;
+        recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum, evalCount }, avgBytes,
+                out);
+    }
+
+    @Override
+    public void init() {
+        globalSum = 0.0;
+        globalCount = 0;
+        metNull = false;
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        inputVal.reset();
+        eval.evaluate(tuple);
+        byte[] serBytes = inputVal.getByteArray();
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+        switch (typeTag) {
+            case NULL: {
+                metNull = true;
+                break;
+            }
+            case SYSTEM_NULL: {
+                // Ignore and return.
+                return;
+            }
+            case RECORD: {
+                // Expected.
+                break;
+            }
+            default: {
+                throw new AlgebricksException("Global-Avg is not defined for values of type "
+                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+            }
+        }
+
+        // The record length helps us determine whether the input record fields are nullable.
+        int recordLength = ARecordSerializerDeserializer.getRecordLength(serBytes, 1);
+        int nullBitmapSize = 1;
+        if (recordLength == 29) {
+            nullBitmapSize = 0;
+        }
+        int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, nullBitmapSize, false);
+        if (offset1 == 0) // the sum is null
+            metNull = true;
+        else
+            globalSum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
+        int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, nullBitmapSize, false);
+        if (offset2 != 0) // the count is not null
+            globalCount += AInt64SerializerDeserializer.getLong(serBytes, offset2);
+
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            if (globalCount == 0 || metNull)
+                nullSerde.serialize(ANull.NULL, out);
+            else {
+                aDouble.setValue(globalSum / globalCount);
+                doubleSerde.serialize(aDouble, out);
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        try {
+            if (metNull || globalCount == 0) {
+                sumBytes.reset();
+                nullSerde.serialize(ANull.NULL, sumBytesOutput);
+            } else {
+                sumBytes.reset();
+                aDouble.setValue(globalSum);
+                doubleSerde.serialize(aDouble, sumBytesOutput);
+            }
+            countBytes.reset();
+            aInt64.setValue(globalCount);
+            longSerde.serialize(aInt64, countBytesOutput);
+            recordEval.evaluate(null);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractLocalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractLocalAvgAggregateFunction.java
new file mode 100644
index 0000000..72da44d
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractLocalAvgAggregateFunction.java
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
+import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractLocalAvgAggregateFunction implements ICopyAggregateFunction {
+    private final ARecordType recType;
+
+    private DataOutput out;
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    private double sum;
+    private long count;
+    private ATypeTag aggType;
+
+    private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
+    private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
+    private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
+    private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
+    private DataOutput countBytesOutput = new DataOutputStream(countBytes);
+    private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
+    private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
+    private ClosedRecordConstructorEval recordEval;
+
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ADOUBLE);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ANULL);
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+
+    public AbstractLocalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        eval = args[0].createEvaluator(inputVal);
+        out = output.getDataOutput();
+
+        List<IAType> unionList = new ArrayList<IAType>();
+        unionList.add(BuiltinType.ANULL);
+        unionList.add(BuiltinType.ADOUBLE);
+        ARecordType tmpRecType;
+        try {
+            tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
+                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
+        } catch (AsterixException e) {
+            throw new AlgebricksException(e);
+        }
+
+        recType = tmpRecType;
+        recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum, evalCount }, avgBytes,
+                out);
+    }
+
+    @Override
+    public void init() {
+        aggType = ATypeTag.SYSTEM_NULL;
+        sum = 0.0;
+        count = 0;
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+            aggType = ATypeTag.NULL;
+            return;
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            aggType = typeTag;
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+                    + aggType + ".");
+        } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+            aggType = typeTag;
+        }
+
+        if (typeTag != ATypeTag.SYSTEM_NULL) {
+            ++count;
+        }
+
+        switch (typeTag) {
+            case INT8: {
+                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT16: {
+                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT32: {
+                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT64: {
+                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case FLOAT: {
+                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case DOUBLE: {
+                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case NULL: {
+                break;
+            }
+            default: {
+                throw new NotImplementedException("Cannot compute LOCAL-AVG for values of type " + typeTag);
+            }
+        }
+        inputVal.reset();
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            if (count == 0 && aggType != ATypeTag.NULL) {
+                out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                return;
+            }
+            if (aggType == ATypeTag.NULL) {
+                sumBytes.reset();
+                nullSerde.serialize(ANull.NULL, sumBytesOutput);
+            } else {
+                sumBytes.reset();
+                aDouble.setValue(sum);
+                doubleSerde.serialize(aDouble, sumBytesOutput);
+            }
+            countBytes.reset();
+            aInt64.setValue(count);
+            longSerde.serialize(aInt64, countBytesOutput);
+            recordEval.evaluate(null);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
new file mode 100644
index 0000000..09c0171
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.asterix.om.types.hierachy.ITypePromoteComputer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractMinMaxAggregateFunction implements ICopyAggregateFunction {
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
+    private ArrayBackedValueStorage tempValForCasting = new ArrayBackedValueStorage();
+    private DataOutput out;
+    private ICopyEvaluator eval;
+    private ATypeTag aggType;
+    private IBinaryComparator cmp;
+    private ITypePromoteComputer tpc;
+    private final boolean isMin;
+    private final boolean isLocalAgg;
+
+    public AbstractMinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
+            boolean isLocalAgg) throws AlgebricksException {
+        out = provider.getDataOutput();
+        eval = args[0].createEvaluator(inputVal);
+        this.isMin = isMin;
+        this.isLocalAgg = isLocalAgg;
+    }
+
+    @Override
+    public void init() {
+        aggType = ATypeTag.SYSTEM_NULL;
+        outputVal.reset();
+        tempValForCasting.reset();
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL) {
+            processNull();
+            return;
+        } else if (aggType == ATypeTag.NULL) {
+            return;
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            if (typeTag == ATypeTag.SYSTEM_NULL) {
+                // Ignore.
+                return;
+            }
+            // First value encountered. Set type, comparator, and initial value.
+            aggType = typeTag;
+            // Set comparator.
+            IBinaryComparatorFactory cmpFactory = AqlBinaryComparatorFactoryProvider.INSTANCE
+                    .getBinaryComparatorFactory(aggType, isMin);
+            cmp = cmpFactory.createBinaryComparator();
+            // Initialize min value.
+            outputVal.assign(inputVal);
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+                    + aggType + ".");
+        } else {
+
+            // If a system_null is encountered locally, it would be an error; otherwise if it is seen
+            // by a global aggregator, it is simple ignored.
+            if (typeTag == ATypeTag.SYSTEM_NULL) {
+                if (isLocalAgg) {
+                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+                } else {
+                    return;
+                }
+            }
+
+            if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+                tpc = ATypeHierarchy.getTypePromoteComputer(aggType, typeTag);
+                aggType = typeTag;
+                cmp = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, isMin)
+                        .createBinaryComparator();
+                if (tpc != null) {
+                    tempValForCasting.reset();
+                    try {
+                        tpc.promote(outputVal.getByteArray(), outputVal.getStartOffset() + 1,
+                                outputVal.getLength() - 1, tempValForCasting);
+                    } catch (IOException e) {
+                        throw new AlgebricksException(e);
+                    }
+                    outputVal.reset();
+                    outputVal.assign(tempValForCasting);
+                }
+                if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
+                        outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
+                    outputVal.assign(inputVal);
+                }
+
+            } else {
+                tpc = ATypeHierarchy.getTypePromoteComputer(typeTag, aggType);
+                if (tpc != null) {
+                    tempValForCasting.reset();
+                    try {
+                        tpc.promote(inputVal.getByteArray(), inputVal.getStartOffset() + 1, inputVal.getLength() - 1,
+                                tempValForCasting);
+                    } catch (IOException e) {
+                        throw new AlgebricksException(e);
+                    }
+                    if (cmp.compare(tempValForCasting.getByteArray(), tempValForCasting.getStartOffset(),
+                            tempValForCasting.getLength(), outputVal.getByteArray(), outputVal.getStartOffset(),
+                            outputVal.getLength()) < 0) {
+                        outputVal.assign(tempValForCasting);
+                    }
+                } else {
+                    if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
+                            outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
+                        outputVal.assign(inputVal);
+                    }
+                }
+
+            }
+        }
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            switch (aggType) {
+                case NULL: {
+                    out.writeByte(ATypeTag.NULL.serialize());
+                    break;
+                }
+                case SYSTEM_NULL: {
+                    // Empty stream. For local agg return system null. For global agg return null.
+                    if (isLocalAgg) {
+                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                    } else {
+                        out.writeByte(ATypeTag.NULL.serialize());
+                    }
+                    break;
+                }
+                default: {
+                    out.write(outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength());
+                    break;
+                }
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
new file mode 100644
index 0000000..b953f78
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunction {
+    private DataOutput out;
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    private double sum;
+    private ATypeTag aggType;
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableFloat aFloat = new AMutableFloat(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+    private AMutableInt32 aInt32 = new AMutableInt32(0);
+    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+    @SuppressWarnings("rawtypes")
+    private ISerializerDeserializer serde;
+
+    private final boolean isLocalAgg;
+
+    public AbstractSumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
+            throws AlgebricksException {
+        out = provider.getDataOutput();
+        eval = args[0].createEvaluator(inputVal);
+        this.isLocalAgg = isLocalAgg;
+    }
+
+    @Override
+    public void init() {
+        aggType = ATypeTag.SYSTEM_NULL;
+        sum = 0.0;
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL) {
+            processNull();
+            return;
+        } else if (aggType == ATypeTag.NULL) {
+            return;
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            aggType = typeTag;
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+            throw new AlgebricksException("Unexpected type " + typeTag
+                    + " in aggregation input stream. Expected type (or a promotable type to)" + aggType + ".");
+        }
+
+        if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+            aggType = typeTag;
+        }
+
+        switch (typeTag) {
+            case INT8: {
+                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT16: {
+                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT32: {
+                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT64: {
+                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case FLOAT: {
+                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case DOUBLE: {
+                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case NULL: {
+                break;
+            }
+            case SYSTEM_NULL: {
+                // For global aggregates simply ignore system null here,
+                // but if all input value are system null, then we should return
+                // null in finish().
+                if (isLocalAgg) {
+                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+                }
+                break;
+            }
+            default: {
+                throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            switch (aggType) {
+                case INT8: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
+                    aInt8.setValue((byte) sum);
+                    serde.serialize(aInt8, out);
+                    break;
+                }
+                case INT16: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
+                    aInt16.setValue((short) sum);
+                    serde.serialize(aInt16, out);
+                    break;
+                }
+                case INT32: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+                    aInt32.setValue((int) sum);
+                    serde.serialize(aInt32, out);
+                    break;
+                }
+                case INT64: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+                    aInt64.setValue((long) sum);
+                    serde.serialize(aInt64, out);
+                    break;
+                }
+                case FLOAT: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
+                    aFloat.setValue((float) sum);
+                    serde.serialize(aFloat, out);
+                    break;
+                }
+                case DOUBLE: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+                    aDouble.setValue(sum);
+                    serde.serialize(aDouble, out);
+                    break;
+                }
+                case NULL: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                    serde.serialize(ANull.NULL, out);
+                    break;
+                }
+                case SYSTEM_NULL: {
+                    // Empty stream. For local agg return system null. For global agg return null.
+                    if (isLocalAgg) {
+                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                    } else {
+                        serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                        serde.serialize(ANull.NULL, out);
+                    }
+                    break;
+                }
+                default:
+                    throw new AlgebricksException("SumAggregationFunction: incompatible type for the result ("
+                            + aggType + "). ");
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java
new file mode 100644
index 0000000..5fd1e9c
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class AvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+    public AvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
+        super(args, output);
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
index c63ab62..56afdd6 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
@@ -14,70 +14,16 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 /**
  * COUNT returns the number of items in the given list. Note that COUNT(NULL) is not allowed.
  */
-public class CountAggregateFunction implements ICopyAggregateFunction {
-    private AMutableInt64 result = new AMutableInt64(-1);
-    @SuppressWarnings("unchecked")
-    private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT64);
-    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-    private ICopyEvaluator eval;
-    private long cnt;
-    private DataOutput out;
+public class CountAggregateFunction extends AbstractCountAggregateFunction {
 
     public CountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
-        eval = args[0].createEvaluator(inputVal);
-        out = output.getDataOutput();
-    }
-
-    @Override
-    public void init() {
-        cnt = 0;
-    }
-
-    @Override
-    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-        inputVal.reset();
-        eval.evaluate(tuple);
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
-        // Ignore SYSTEM_NULL.
-        if (typeTag != ATypeTag.SYSTEM_NULL) {
-            cnt++;
-        }
-    }
-
-    @Override
-    public void finish() throws AlgebricksException {
-        try {
-            result.setValue(cnt);
-            int64Serde.serialize(result, out);
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    @Override
-    public void finishPartial() throws AlgebricksException {
-        finish();
+        super(args, output);
     }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java
new file mode 100644
index 0000000..47f87ff
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class GlobalAvgAggregateFunction extends AbstractGlobalAvgAggregateFunction {
+
+    public GlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        super(args, output);
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java
new file mode 100644
index 0000000..49bbd2f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalAvgAggregateFunction extends AbstractLocalAvgAggregateFunction {
+
+    public LocalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        super(args, output);
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
index 9e411b6..c540c43 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import edu.uci.ics.asterix.common.functions.FunctionConstants;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMaxAggregateDescriptor.java
new file mode 100644
index 0000000..6b31244
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMaxAggregateDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlMaxAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = AsterixBuiltinFunctions.LOCAL_SQL_MAX;
+
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalSqlMaxAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlMinMaxAggregateFunction(args, provider, false, true);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMinAggregateDescriptor.java
new file mode 100644
index 0000000..4946703
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMinAggregateDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlMinAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = AsterixBuiltinFunctions.LOCAL_SQL_MIN;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalSqlMinAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlMinMaxAggregateFunction(args, provider, true, true);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..1f47727
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlSumAggregateDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = AsterixBuiltinFunctions.LOCAL_SQL_SUM;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalSqlSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlSumAggregateFunction(args, provider, true);
+            };
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
index 3e0b65c..0e35cd3 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import edu.uci.ics.asterix.common.functions.FunctionConstants;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
index 8599913..7612c9c 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
@@ -14,164 +14,14 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
-import edu.uci.ics.asterix.om.types.hierachy.ITypePromoteComputer;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class MinMaxAggregateFunction implements ICopyAggregateFunction {
-    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-    private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
-    private ArrayBackedValueStorage tempValForCasting = new ArrayBackedValueStorage();
-    private DataOutput out;
-    private ICopyEvaluator eval;
-    private ATypeTag aggType;
-    private IBinaryComparator cmp;
-    private ITypePromoteComputer tpc;
-    private final boolean isMin;
-    private final boolean isLocalAgg;
+public class MinMaxAggregateFunction extends AbstractMinMaxAggregateFunction{
 
     public MinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
             boolean isLocalAgg) throws AlgebricksException {
-        out = provider.getDataOutput();
-        eval = args[0].createEvaluator(inputVal);
-        this.isMin = isMin;
-        this.isLocalAgg = isLocalAgg;
-    }
-
-    @Override
-    public void init() {
-        aggType = ATypeTag.SYSTEM_NULL;
-        outputVal.reset();
-        tempValForCasting.reset();
-    }
-
-    @Override
-    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-        inputVal.reset();
-        eval.evaluate(tuple);
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
-        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-            aggType = ATypeTag.NULL;
-            return;
-        }
-        if (aggType == ATypeTag.SYSTEM_NULL) {
-            if (typeTag == ATypeTag.SYSTEM_NULL) {
-                // Ignore.
-                return;
-            }
-            // First value encountered. Set type, comparator, and initial value.
-            aggType = typeTag;
-            // Set comparator.
-            IBinaryComparatorFactory cmpFactory = AqlBinaryComparatorFactoryProvider.INSTANCE
-                    .getBinaryComparatorFactory(aggType, isMin);
-            cmp = cmpFactory.createBinaryComparator();
-            // Initialize min value.
-            outputVal.assign(inputVal);
-        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
-            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
-                    + aggType + ".");
-        } else {
-
-            // If a system_null is encountered locally, it would be an error; otherwise if it is seen
-            // by a global aggregator, it is simple ignored.
-            if (typeTag == ATypeTag.SYSTEM_NULL) {
-                if (isLocalAgg) {
-                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
-                } else {
-                    return;
-                }
-            }
-
-            if (ATypeHierarchy.canPromote(aggType, typeTag)) {
-                tpc = ATypeHierarchy.getTypePromoteComputer(aggType, typeTag);
-                aggType = typeTag;
-                cmp = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, isMin)
-                        .createBinaryComparator();
-                if (tpc != null) {
-                    tempValForCasting.reset();
-                    try {
-                        tpc.promote(outputVal.getByteArray(), outputVal.getStartOffset() + 1,
-                                outputVal.getLength() - 1, tempValForCasting);
-                    } catch (IOException e) {
-                        throw new AlgebricksException(e);
-                    }
-                    outputVal.reset();
-                    outputVal.assign(tempValForCasting);
-                }
-                if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
-                        outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
-                    outputVal.assign(inputVal);
-                }
-
-            } else {
-                tpc = ATypeHierarchy.getTypePromoteComputer(typeTag, aggType);
-                if (tpc != null) {
-                    tempValForCasting.reset();
-                    try {
-                        tpc.promote(inputVal.getByteArray(), inputVal.getStartOffset() + 1, inputVal.getLength() - 1,
-                                tempValForCasting);
-                    } catch (IOException e) {
-                        throw new AlgebricksException(e);
-                    }
-                    if (cmp.compare(tempValForCasting.getByteArray(), tempValForCasting.getStartOffset(),
-                            tempValForCasting.getLength(), outputVal.getByteArray(), outputVal.getStartOffset(),
-                            outputVal.getLength()) < 0) {
-                        outputVal.assign(tempValForCasting);
-                    }
-                } else {
-                    if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
-                            outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
-                        outputVal.assign(inputVal);
-                    }
-                }
-
-            }
-        }
-    }
-
-    @Override
-    public void finish() throws AlgebricksException {
-        try {
-            switch (aggType) {
-                case NULL: {
-                    out.writeByte(ATypeTag.NULL.serialize());
-                    break;
-                }
-                case SYSTEM_NULL: {
-                    // Empty stream. For local agg return system null. For global agg return null.
-                    if (isLocalAgg) {
-                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                    } else {
-                        out.writeByte(ATypeTag.NULL.serialize());
-                    }
-                    break;
-                }
-                default: {
-                    out.write(outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength());
-                    break;
-                }
-            }
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    @Override
-    public void finishPartial() throws AlgebricksException {
-        finish();
+        super(args, provider, isMin, isLocalAgg);
     }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateDescriptor.java
new file mode 100644
index 0000000..b484b53
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateDescriptor.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+/**
+ * NULLs are also counted.
+ */
+public class SqlCountAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlCountAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SQL_COUNT;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlCountAggregateFunction(args, provider);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateFunction.java
new file mode 100644
index 0000000..5fb5d72
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+/**
+ * COUNT returns the number of non-null items in the given list. Note that COUNT(NULL) is not allowed.
+ */
+public class SqlCountAggregateFunction extends AbstractCountAggregateFunction {
+
+    public SqlCountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        super(args, output);
+    }
+
+    @Override
+    protected void processNull() {
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMaxAggregateDescriptor.java
new file mode 100644
index 0000000..c8e415f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMaxAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlMaxAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlMaxAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SQL_MAX;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlMinMaxAggregateFunction(args, provider, false, false);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinAggregateDescriptor.java
new file mode 100644
index 0000000..fe934f8
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlMinAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlMinAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SQL_MIN;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlMinMaxAggregateFunction(args, provider, true, false);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java
new file mode 100644
index 0000000..2111ccd
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlMinMaxAggregateFunction extends AbstractMinMaxAggregateFunction {
+
+    public SqlMinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
+            boolean isLocalAgg) throws AlgebricksException {
+        super(args, provider, isMin, isLocalAgg);
+    }
+
+    @Override
+    protected void processNull() {
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..294d9c1
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SQL_SUM;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlSumAggregateFunction(args, provider, false);
+            };
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java
new file mode 100644
index 0000000..4bd533d
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlSumAggregateFunction extends AbstractSumAggregateFunction {
+
+    public SqlSumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
+            throws AlgebricksException {
+        super(args, provider, isLocalAgg);
+    }
+
+    @Override
+    protected void processNull() {
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
index 31ad055..52ec126 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
@@ -14,202 +14,14 @@
  */
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
-import edu.uci.ics.asterix.om.base.ANull;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-public class SumAggregateFunction implements ICopyAggregateFunction {
-    private DataOutput out;
-    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-    private ICopyEvaluator eval;
-    private double sum;
-    private ATypeTag aggType;
-    private AMutableDouble aDouble = new AMutableDouble(0);
-    private AMutableFloat aFloat = new AMutableFloat(0);
-    private AMutableInt64 aInt64 = new AMutableInt64(0);
-    private AMutableInt32 aInt32 = new AMutableInt32(0);
-    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
-    @SuppressWarnings("rawtypes")
-    private ISerializerDeserializer serde;
-
-    private final boolean isLocalAgg;
+public class SumAggregateFunction extends AbstractSumAggregateFunction {
 
     public SumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
             throws AlgebricksException {
-        out = provider.getDataOutput();
-        eval = args[0].createEvaluator(inputVal);
-        this.isLocalAgg = isLocalAgg;
-    }
-
-    @Override
-    public void init() {
-        aggType = ATypeTag.SYSTEM_NULL;
-        sum = 0.0;
-    }
-
-    @Override
-    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-        inputVal.reset();
-        eval.evaluate(tuple);
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
-        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-            aggType = ATypeTag.NULL;
-            return;
-        } else if (aggType == ATypeTag.SYSTEM_NULL) {
-            aggType = typeTag;
-        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
-            throw new AlgebricksException("Unexpected type " + typeTag
-                    + " in aggregation input stream. Expected type (or a promotable type to)" + aggType + ".");
-        }
-
-        if (ATypeHierarchy.canPromote(aggType, typeTag)) {
-            aggType = typeTag;
-        }
-
-        switch (typeTag) {
-            case INT8: {
-                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT16: {
-                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT32: {
-                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case INT64: {
-                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case FLOAT: {
-                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case DOUBLE: {
-                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case NULL: {
-                break;
-            }
-            case SYSTEM_NULL: {
-                // For global aggregates simply ignore system null here,
-                // but if all input value are system null, then we should return
-                // null in finish().
-                if (isLocalAgg) {
-                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
-                }
-                break;
-            }
-            default: {
-                throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
-            }
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void finish() throws AlgebricksException {
-        try {
-            switch (aggType) {
-                case INT8: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
-                    aInt8.setValue((byte) sum);
-                    serde.serialize(aInt8, out);
-                    break;
-                }
-                case INT16: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
-                    aInt16.setValue((short) sum);
-                    serde.serialize(aInt16, out);
-                    break;
-                }
-                case INT32: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
-                    aInt32.setValue((int) sum);
-                    serde.serialize(aInt32, out);
-                    break;
-                }
-                case INT64: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
-                    aInt64.setValue((long) sum);
-                    serde.serialize(aInt64, out);
-                    break;
-                }
-                case FLOAT: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
-                    aFloat.setValue((float) sum);
-                    serde.serialize(aFloat, out);
-                    break;
-                }
-                case DOUBLE: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    aDouble.setValue(sum);
-                    serde.serialize(aDouble, out);
-                    break;
-                }
-                case NULL: {
-                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                    serde.serialize(ANull.NULL, out);
-                    break;
-                }
-                case SYSTEM_NULL: {
-                    // Empty stream. For local agg return system null. For global agg return null.
-                    if (isLocalAgg) {
-                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                    } else {
-                        serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                        serde.serialize(ANull.NULL, out);
-                    }
-                    break;
-                }
-                default:
-                    throw new AlgebricksException("SumAggregationFunction: incompatible type for the result ("
-                            + aggType + "). ");
-            }
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    @Override
-    public void finishPartial() throws AlgebricksException {
-        finish();
+        super(args, provider, isLocalAgg);
     }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index ea75c77..7801b53 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -65,12 +65,22 @@
 import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarCountAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarMaxAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlCountAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlMaxAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableCountAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableGlobalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableGlobalSqlAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSqlAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSumAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSqlCountAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.AvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.CountAggregateDescriptor;
@@ -78,9 +88,16 @@
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalMaxAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlMaxAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.MaxAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.MinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlCountAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlMaxAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.SumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.stream.EmptyStreamAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.stream.NonEmptyStreamAggregateDescriptor;
@@ -426,6 +443,33 @@
         temp.add(ScalarMaxAggregateDescriptor.FACTORY);
         temp.add(ScalarMinAggregateDescriptor.FACTORY);
 
+        // SQL aggregates
+        temp.add(SqlCountAggregateDescriptor.FACTORY);
+//        temp.add(SqlAvgAggregateDescriptor.FACTORY);
+//        temp.add(LocalSqlAvgAggregateDescriptor.FACTORY);
+//        temp.add(GlobalSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(SqlSumAggregateDescriptor.FACTORY);
+        temp.add(LocalSqlSumAggregateDescriptor.FACTORY);
+        temp.add(SqlMaxAggregateDescriptor.FACTORY);
+        temp.add(LocalSqlMaxAggregateDescriptor.FACTORY);
+        temp.add(SqlMinAggregateDescriptor.FACTORY);
+        temp.add(LocalSqlMinAggregateDescriptor.FACTORY);
+
+        // SQL serializable aggregates
+        temp.add(SerializableSqlCountAggregateDescriptor.FACTORY);
+//        temp.add(SerializableSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(SerializableLocalSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(SerializableGlobalSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(SerializableSqlSumAggregateDescriptor.FACTORY);
+        temp.add(SerializableLocalSqlSumAggregateDescriptor.FACTORY);
+
+        // SQL scalar aggregates
+        temp.add(ScalarSqlCountAggregateDescriptor.FACTORY);
+        temp.add(ScalarSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(ScalarSqlSumAggregateDescriptor.FACTORY);
+        temp.add(ScalarSqlMaxAggregateDescriptor.FACTORY);
+        temp.add(ScalarSqlMinAggregateDescriptor.FACTORY);
+
         // new functions - constructors
         temp.add(ABooleanConstructorDescriptor.FACTORY);
         temp.add(ANullConstructorDescriptor.FACTORY);