Finished SQL-AVG for aggregate and scalar cases.
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
index d322b98..5d93018 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
@@ -65,7 +65,7 @@
 
     @Override
     public FunctionIdentifier getIdentifier() {
-        return AsterixBuiltinFunctions.SERIAL_GLOBAL_AVG;
+        return AsterixBuiltinFunctions.SERIAL_GLOBAL_SQL_AVG;
     }
 
     @Override
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
index f65b27d..6c8b6e0 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
@@ -69,7 +69,7 @@
 
     @Override
     public FunctionIdentifier getIdentifier() {
-        return AsterixBuiltinFunctions.SERIAL_LOCAL_AVG;
+        return AsterixBuiltinFunctions.SERIAL_LOCAL_SQL_AVG;
     }
 
     @Override
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
index 7b2b18d..dc91711 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
@@ -27,7 +27,7 @@
 public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
-    private final static FunctionIdentifier FID = AsterixBuiltinFunctions.SERIAL_LOCAL_SUM;
+    private final static FunctionIdentifier FID = AsterixBuiltinFunctions.SERIAL_LOCAL_SQL_SUM;
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableLocalSqlSumAggregateDescriptor();
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..0ed0f13
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SERIAL_SQL_AVG;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        final ICopyEvaluatorFactory[] evals = args;
+
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new ICopySerializableAggregateFunction() {
+                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+                    private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
+
+                    private AMutableDouble aDouble = new AMutableDouble(0);
+                    @SuppressWarnings("unchecked")
+                    private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
+                    @SuppressWarnings("unchecked")
+                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+                            .getSerializerDeserializer(BuiltinType.ANULL);
+
+                    @Override
+                    public void init(DataOutput state) throws AlgebricksException {
+                        try {
+                            state.writeDouble(0.0);
+                            state.writeLong(0);
+                            state.writeBoolean(false);
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
+                        }
+                    }
+
+                    @Override
+                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
+                            throws AlgebricksException {
+                        inputVal.reset();
+                        eval.evaluate(tuple);
+                        double sum = BufferSerDeUtil.getDouble(state, start);
+                        long count = BufferSerDeUtil.getLong(state, start + 8);
+                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
+                        if (inputVal.getLength() > 0) {
+                            ++count;
+                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
+                                    .getByteArray()[0]);
+                            switch (typeTag) {
+                                case INT8: {
+                                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                                    sum += val;
+                                    break;
+                                }
+                                case INT16: {
+                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                                    sum += val;
+                                    break;
+                                }
+                                case INT32: {
+                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                                    sum += val;
+                                    break;
+                                }
+                                case INT64: {
+                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                                    sum += val;
+                                    break;
+                                }
+                                case FLOAT: {
+                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                                    sum += val;
+                                    break;
+                                }
+                                case DOUBLE: {
+                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                                    sum += val;
+                                    break;
+                                }
+                                case NULL: {
+                                    metNull = true;
+                                    break;
+                                }
+                                default: {
+                                    throw new NotImplementedException("Cannot compute AVG for values of type "
+                                            + typeTag);
+                                }
+                            }
+                            inputVal.reset();
+                        }
+                        BufferSerDeUtil.writeDouble(sum, state, start);
+                        BufferSerDeUtil.writeLong(count, state, start + 8);
+                        BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
+                    }
+
+                    @Override
+                    public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+                        double sum = BufferSerDeUtil.getDouble(state, start);
+                        long count = BufferSerDeUtil.getLong(state, start + 8);
+                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
+
+                        if (count == 0) {
+                            GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
+                        } else {
+                            try {
+                                if (metNull)
+                                    nullSerde.serialize(ANull.NULL, result);
+                                else {
+                                    aDouble.setValue(sum / count);
+                                    doubleSerde.serialize(aDouble, result);
+                                }
+                            } catch (IOException e) {
+                                throw new AlgebricksException(e);
+                            }
+                        }
+                    }
+
+                    @Override
+                    public void finishPartial(byte[] data, int start, int len, DataOutput partialResult)
+                            throws AlgebricksException {
+                        try {
+                            partialResult.write(data, start, len);
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
+                        }
+                    }
+
+                };
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
index 4c9dd88..3a08642 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
@@ -35,7 +35,7 @@
 
     @Override
     public FunctionIdentifier getIdentifier() {
-        return AsterixBuiltinFunctions.SERIAL_SUM;
+        return AsterixBuiltinFunctions.SERIAL_SQL_SUM;
     }
 
     @Override
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
index 30a0f71..d322669 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
@@ -128,7 +128,7 @@
         byte[] serBytes = inputVal.getByteArray();
         ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
         if (typeTag == ATypeTag.NULL) {
-            aggType = ATypeTag.NULL;
+            processNull();
             return;
         } else if (aggType == ATypeTag.NULL) {
             return;
@@ -280,4 +280,8 @@
             }
         }
     }
+    
+    protected void processNull() {
+        aggType = ATypeTag.NULL;
+    }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractGlobalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractGlobalAvgAggregateFunction.java
index 253be6d..88b0ebf 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractGlobalAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractGlobalAvgAggregateFunction.java
@@ -20,9 +20,14 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.om.base.ADouble;
@@ -36,9 +41,11 @@
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
 import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
 import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
@@ -54,10 +61,12 @@
     private DataOutput out;
     private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
     private ICopyEvaluator eval;
-    private double globalSum;
-    private long globalCount;
+    private ATypeTag aggType;
+    private double sum;
+    private long count;
     private AMutableDouble aDouble = new AMutableDouble(0);
     private AMutableInt64 aInt64 = new AMutableInt64(0);
+    private final boolean isLocalAgg;
 
     private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
     private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
@@ -69,28 +78,33 @@
     private ClosedRecordConstructorEval recordEval;
 
     @SuppressWarnings("unchecked")
-    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AINT64);
-    @SuppressWarnings("unchecked")
     private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(BuiltinType.ADOUBLE);
     @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt64> intSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT64);
+    @SuppressWarnings("unchecked")
     private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(BuiltinType.ANULL);
-    private boolean metNull;
 
-    public AbstractGlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
-            throws AlgebricksException {
+    public AbstractGlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output,
+            boolean isLocalAgg) throws AlgebricksException {
         eval = args[0].createEvaluator(inputVal);
         out = output.getDataOutput();
+        this.isLocalAgg = isLocalAgg;
 
         List<IAType> unionList = new ArrayList<IAType>();
         unionList.add(BuiltinType.ANULL);
         unionList.add(BuiltinType.ADOUBLE);
         ARecordType tmpRecType;
         try {
-            tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
+            if (isLocalAgg) {
+                tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
+                        new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, false);
+            } else {
+                tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
+                        new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
+            }
         } catch (AsterixException e) {
             throw new AlgebricksException(e);
         }
@@ -102,9 +116,9 @@
 
     @Override
     public void init() {
-        globalSum = 0.0;
-        globalCount = 0;
-        metNull = false;
+        aggType = ATypeTag.SYSTEM_NULL;
+        sum = 0.0;
+        count = 0;
     }
 
     @Override
@@ -113,50 +127,119 @@
         eval.evaluate(tuple);
         byte[] serBytes = inputVal.getByteArray();
         ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
-        switch (typeTag) {
-            case NULL: {
-                metNull = true;
-                break;
-            }
-            case SYSTEM_NULL: {
-                // Ignore and return.
+        if (typeTag == ATypeTag.NULL) {
+            aggType = ATypeTag.NULL;
+            return;
+        } else if (aggType == ATypeTag.NULL) {
+            return;
+        } else if (!isLocalAgg && typeTag == ATypeTag.RECORD) {
+            if (aggType == ATypeTag.SYSTEM_NULL) {
                 return;
             }
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            aggType = typeTag;
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+                    + aggType + ".");
+        } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+            aggType = typeTag;
+        }
+
+        if (typeTag != ATypeTag.SYSTEM_NULL) {
+            ++count;
+        }
+
+        switch (typeTag) {
+            case INT8: {
+                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT16: {
+                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT32: {
+                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT64: {
+                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case FLOAT: {
+                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case DOUBLE: {
+                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case NULL: {
+                break;
+            }
             case RECORD: {
-                // Expected.
+                // Expected for global aggregate.
+                if (isLocalAgg) {
+                    throw new AlgebricksException("Global-Avg is not defined for values of type "
+                            + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+                } else {
+                    // The record length helps us determine whether the input record fields are nullable.
+                    int recordLength = ARecordSerializerDeserializer.getRecordLength(serBytes, 1);
+                    int nullBitmapSize = 1;
+                    if (recordLength == 29) {
+                        nullBitmapSize = 0;
+                    }
+                    int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, nullBitmapSize, false);
+                    if (offset1 == 0) // the sum is null
+                        aggType = ATypeTag.NULL;
+                    else
+                        sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
+                    int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, nullBitmapSize, false);
+                    if (offset2 != 0) // the count is not null
+                        count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
+                }
                 break;
             }
             default: {
-                throw new AlgebricksException("Global-Avg is not defined for values of type "
-                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
             }
         }
-
-        // The record length helps us determine whether the input record fields are nullable.
-        int recordLength = ARecordSerializerDeserializer.getRecordLength(serBytes, 1);
-        int nullBitmapSize = 1;
-        if (recordLength == 29) {
-            nullBitmapSize = 0;
-        }
-        int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, nullBitmapSize, false);
-        if (offset1 == 0) // the sum is null
-            metNull = true;
-        else
-            globalSum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
-        int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, nullBitmapSize, false);
-        if (offset2 != 0) // the count is not null
-            globalCount += AInt64SerializerDeserializer.getLong(serBytes, offset2);
-
+        inputVal.reset();
     }
 
     @Override
     public void finish() throws AlgebricksException {
         try {
-            if (globalCount == 0 || metNull)
-                nullSerde.serialize(ANull.NULL, out);
-            else {
-                aDouble.setValue(globalSum / globalCount);
-                doubleSerde.serialize(aDouble, out);
+            if (isLocalAgg) {
+                if (count == 0 && aggType != ATypeTag.NULL) {
+                    out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                    return;
+                }
+                if (aggType == ATypeTag.NULL) {
+                    sumBytes.reset();
+                    nullSerde.serialize(ANull.NULL, sumBytesOutput);
+                } else {
+                    sumBytes.reset();
+                    aDouble.setValue(sum);
+                    doubleSerde.serialize(aDouble, sumBytesOutput);
+                }
+                countBytes.reset();
+                aInt64.setValue(count);
+                intSerde.serialize(aInt64, countBytesOutput);
+                recordEval.evaluate(null);
+            } else {
+                if (count == 0 || aggType == ATypeTag.NULL) {
+                    nullSerde.serialize(ANull.NULL, out);
+                } else {
+                    aDouble.setValue(sum / count);
+                    doubleSerde.serialize(aDouble, out);
+                }
             }
         } catch (IOException e) {
             throw new AlgebricksException(e);
@@ -165,22 +248,31 @@
 
     @Override
     public void finishPartial() throws AlgebricksException {
-        try {
-            if (metNull || globalCount == 0) {
-                sumBytes.reset();
-                nullSerde.serialize(ANull.NULL, sumBytesOutput);
+        if (isLocalAgg) {
+            finish();
+        } else {
+            if (count == 0) {
+                if (GlobalConfig.DEBUG) {
+                    GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+                }
             } else {
-                sumBytes.reset();
-                aDouble.setValue(globalSum);
-                doubleSerde.serialize(aDouble, sumBytesOutput);
+                try {
+                    if (count == 0 || aggType == ATypeTag.NULL) {
+                        sumBytes.reset();
+                        nullSerde.serialize(ANull.NULL, sumBytesOutput);
+                    } else {
+                        sumBytes.reset();
+                        aDouble.setValue(sum);
+                        doubleSerde.serialize(aDouble, sumBytesOutput);
+                    }
+                    countBytes.reset();
+                    aInt64.setValue(count);
+                    intSerde.serialize(aInt64, countBytesOutput);
+                    recordEval.evaluate(null);
+                } catch (IOException e) {
+                    throw new AlgebricksException(e);
+                }
             }
-            countBytes.reset();
-            aInt64.setValue(globalCount);
-            longSerde.serialize(aInt64, countBytesOutput);
-            recordEval.evaluate(null);
-        } catch (IOException e) {
-            throw new AlgebricksException(e);
         }
     }
-
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractLocalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractLocalAvgAggregateFunction.java
index e950ae7..5be48de 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractLocalAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractLocalAvgAggregateFunction.java
@@ -20,6 +20,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
@@ -27,6 +28,7 @@
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.om.base.ADouble;
 import edu.uci.ics.asterix.om.base.AInt64;
@@ -62,6 +64,9 @@
     private ATypeTag aggType;
     private double sum;
     private long count;
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+    private final boolean isLocalAgg;
 
     private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
     private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
@@ -71,30 +76,36 @@
     private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
     private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
     private ClosedRecordConstructorEval recordEval;
+
     @SuppressWarnings("unchecked")
     private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(BuiltinType.ADOUBLE);
     @SuppressWarnings("unchecked")
-    private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+    private ISerializerDeserializer<AInt64> intSerde = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(BuiltinType.AINT64);
     @SuppressWarnings("unchecked")
     private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(BuiltinType.ANULL);
-    private AMutableDouble aDouble = new AMutableDouble(0);
-    private AMutableInt64 aInt64 = new AMutableInt64(0);
+    private boolean metNull;
 
-    public AbstractLocalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
-            throws AlgebricksException {
+    public AbstractLocalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output,
+            boolean isLocalAgg) throws AlgebricksException {
         eval = args[0].createEvaluator(inputVal);
         out = output.getDataOutput();
+        this.isLocalAgg = isLocalAgg;
 
         List<IAType> unionList = new ArrayList<IAType>();
         unionList.add(BuiltinType.ANULL);
         unionList.add(BuiltinType.ADOUBLE);
         ARecordType tmpRecType;
         try {
-            tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
-                    new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, false);
+            if (isLocalAgg) {
+                tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
+                        new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, false);
+            } else {
+                tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
+                        new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
+            }
         } catch (AsterixException e) {
             throw new AlgebricksException(e);
         }
@@ -109,89 +120,137 @@
         aggType = ATypeTag.SYSTEM_NULL;
         sum = 0.0;
         count = 0;
+        metNull = false;
     }
 
     @Override
     public void step(IFrameTupleReference tuple) throws AlgebricksException {
         inputVal.reset();
         eval.evaluate(tuple);
-        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
-        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
-            aggType = ATypeTag.NULL;
-            return;
-        } else if (aggType == ATypeTag.SYSTEM_NULL) {
-            aggType = typeTag;
-        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
-            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
-                    + aggType + ".");
-        } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
-            aggType = typeTag;
-        }
+        byte[] serBytes = inputVal.getByteArray();
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+        if (isLocalAgg) {
+            if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+                aggType = ATypeTag.NULL;
+                return;
+            } else if (aggType == ATypeTag.SYSTEM_NULL) {
+                aggType = typeTag;
+            } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+                throw new AlgebricksException("Unexpected type " + typeTag
+                        + " in aggregation input stream. Expected type " + aggType + ".");
+            } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+                aggType = typeTag;
+            }
 
-        if (typeTag != ATypeTag.SYSTEM_NULL) {
-            ++count;
-        }
+            if (typeTag != ATypeTag.SYSTEM_NULL) {
+                ++count;
+            }
 
-        switch (typeTag) {
-            case INT8: {
-                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
+            switch (typeTag) {
+                case INT8: {
+                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case INT16: {
+                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case INT32: {
+                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case INT64: {
+                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case FLOAT: {
+                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case DOUBLE: {
+                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case NULL: {
+                    break;
+                }
+                default: {
+                    throw new NotImplementedException("Cannot compute LOCAL-AVG for values of type " + typeTag);
+                }
             }
-            case INT16: {
-                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
+            inputVal.reset();
+
+        } else {
+            switch (typeTag) {
+                case NULL: {
+                    metNull = true;
+                    break;
+                }
+                case SYSTEM_NULL: {
+                    // Ignore and return.
+                    return;
+                }
+                case RECORD: {
+                    // Expected.
+                    break;
+                }
+                default: {
+                    throw new AlgebricksException("Global-Avg is not defined for values of type "
+                            + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+                }
             }
-            case INT32: {
-                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
+
+            // The record length helps us determine whether the input record fields are nullable.
+            int recordLength = ARecordSerializerDeserializer.getRecordLength(serBytes, 1);
+            int nullBitmapSize = 1;
+            if (recordLength == 29) {
+                nullBitmapSize = 0;
             }
-            case INT64: {
-                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case FLOAT: {
-                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case DOUBLE: {
-                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                sum += val;
-                break;
-            }
-            case NULL: {
-                break;
-            }
-            default: {
-                throw new NotImplementedException("Cannot compute LOCAL-AVG for values of type " + typeTag);
-            }
+            int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, nullBitmapSize, false);
+            if (offset1 == 0) // the sum is null
+                metNull = true;
+            else
+                sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
+            int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, nullBitmapSize, false);
+            if (offset2 != 0) // the count is not null
+                count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
         }
-        inputVal.reset();
     }
 
     @Override
     public void finish() throws AlgebricksException {
         try {
-            if (count == 0 && aggType != ATypeTag.NULL) {
-                out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-                return;
-            }
-            if (aggType == ATypeTag.NULL) {
-                sumBytes.reset();
-                nullSerde.serialize(ANull.NULL, sumBytesOutput);
+            if (isLocalAgg) {
+                if (count == 0 && aggType != ATypeTag.NULL) {
+                    out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                    return;
+                }
+                if (aggType == ATypeTag.NULL) {
+                    sumBytes.reset();
+                    nullSerde.serialize(ANull.NULL, sumBytesOutput);
+                } else {
+                    sumBytes.reset();
+                    aDouble.setValue(sum);
+                    doubleSerde.serialize(aDouble, sumBytesOutput);
+                }
+                countBytes.reset();
+                aInt64.setValue(count);
+                intSerde.serialize(aInt64, countBytesOutput);
+                recordEval.evaluate(null);
             } else {
-                sumBytes.reset();
-                aDouble.setValue(sum);
-                doubleSerde.serialize(aDouble, sumBytesOutput);
+                if (count == 0 || metNull) {
+                    nullSerde.serialize(ANull.NULL, out);
+                } else {
+                    aDouble.setValue(sum / count);
+                    doubleSerde.serialize(aDouble, out);
+                }
             }
-            countBytes.reset();
-            aInt64.setValue(count);
-            longSerde.serialize(aInt64, countBytesOutput);
-            recordEval.evaluate(null);
         } catch (IOException e) {
             throw new AlgebricksException(e);
         }
@@ -199,6 +258,31 @@
 
     @Override
     public void finishPartial() throws AlgebricksException {
-        finish();
+        if (isLocalAgg) {
+            finish();
+        } else {
+            if (count == 0) {
+                if (GlobalConfig.DEBUG) {
+                    GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+                }
+            } else {
+                try {
+                    if (metNull || count == 0) {
+                        sumBytes.reset();
+                        nullSerde.serialize(ANull.NULL, sumBytesOutput);
+                    } else {
+                        sumBytes.reset();
+                        aDouble.setValue(sum);
+                        doubleSerde.serialize(aDouble, sumBytesOutput);
+                    }
+                    countBytes.reset();
+                    aInt64.setValue(count);
+                    intSerde.serialize(aInt64, countBytesOutput);
+                    recordEval.evaluate(null);
+                } catch (IOException e) {
+                    throw new AlgebricksException(e);
+                }
+            }
+        }
     }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..0808e81
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class GlobalSqlAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new GlobalSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.GLOBAL_SQL_AVG;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new GlobalSqlAvgAggregateFunction(args, provider);
+            }
+        };
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..4619c4e
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class GlobalSqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+    public GlobalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        super(args, output, false);
+    }
+
+    @Override
+    protected void processNull() {
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..6a1f60e
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalSqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.LOCAL_SQL_AVG;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new LocalSqlAvgAggregateFunction(args, provider);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..0fca8a9
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+    public LocalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+            throws AlgebricksException {
+        super(args, output, true);
+    }
+
+    @Override
+    protected void processNull() {
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..cf3f3dd
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.SQL_AVG;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SqlAvgAggregateFunction(args, provider);
+            }
+        };
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java
new file mode 100644
index 0000000..08d6353
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+    public SqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
+        super(args, output, false);
+    }
+
+    @Override
+    protected void processNull() {
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index 7801b53..5ae2459 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -79,21 +79,25 @@
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSqlAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSumAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSqlAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSqlCountAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.AvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.CountAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.GlobalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.GlobalSqlAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalMaxAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlMaxAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlMinAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.MaxAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.MinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.SqlCountAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.SqlMaxAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.SqlMinAggregateDescriptor;
@@ -445,9 +449,9 @@
 
         // SQL aggregates
         temp.add(SqlCountAggregateDescriptor.FACTORY);
-//        temp.add(SqlAvgAggregateDescriptor.FACTORY);
-//        temp.add(LocalSqlAvgAggregateDescriptor.FACTORY);
-//        temp.add(GlobalSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(SqlAvgAggregateDescriptor.FACTORY);
+        temp.add(LocalSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(GlobalSqlAvgAggregateDescriptor.FACTORY);
         temp.add(SqlSumAggregateDescriptor.FACTORY);
         temp.add(LocalSqlSumAggregateDescriptor.FACTORY);
         temp.add(SqlMaxAggregateDescriptor.FACTORY);
@@ -457,7 +461,7 @@
 
         // SQL serializable aggregates
         temp.add(SerializableSqlCountAggregateDescriptor.FACTORY);
-//        temp.add(SerializableSqlAvgAggregateDescriptor.FACTORY);
+        temp.add(SerializableSqlAvgAggregateDescriptor.FACTORY);
         temp.add(SerializableLocalSqlAvgAggregateDescriptor.FACTORY);
         temp.add(SerializableGlobalSqlAvgAggregateDescriptor.FACTORY);
         temp.add(SerializableSqlSumAggregateDescriptor.FACTORY);