Minor fixes to avg agg functions.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_fix_agg@615 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/resources/runtimets/queries/tpch/q1_pricing_summary_report_nt.aql b/asterix-app/src/test/resources/runtimets/queries/tpch/q1_pricing_summary_report_nt.aql
index b53fc877..af39b3f 100644
--- a/asterix-app/src/test/resources/runtimets/queries/tpch/q1_pricing_summary_report_nt.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/tpch/q1_pricing_summary_report_nt.aql
@@ -1,7 +1,5 @@
 drop dataverse tpch if exists;
 create dataverse tpch;
-  
-
 use dataverse tpch;
 
 create type LineItemType as closed {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
index 99d28ae..3dea2e1 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
@@ -59,8 +59,8 @@
     }
 
     @Override
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(ICopyEvaluatorFactory[] args)
-            throws AlgebricksException {
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            ICopyEvaluatorFactory[] args) throws AlgebricksException {
         final ICopyEvaluatorFactory[] evals = args;
         List<IAType> unionList = new ArrayList<IAType>();
         unionList.add(BuiltinType.ANULL);
@@ -103,7 +103,7 @@
                         try {
                             state.writeDouble(0.0);
                             state.writeLong(0);
-                            state.writeBoolean(false);
+                            state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
                         } catch (IOException e) {
                             throw new AlgebricksException(e);
                         }
@@ -116,72 +116,77 @@
                         eval.evaluate(tuple);
                         double sum = BufferSerDeUtil.getDouble(state, start);
                         long count = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-                        if (inputVal.getLength() > 0) {
-                            ++count;
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                                    .deserialize(inputVal.getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT16: {
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT32: {
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT64: {
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute AVG for values of type "
-                                            + typeTag);
-                                }
+                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                                .deserialize(inputVal.getByteArray()[0]);
+                        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
+                        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+                            aggType = ATypeTag.NULL;
+                            return;
+                        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+                            aggType = typeTag;
+                        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
+                            throw new AlgebricksException("Unexpected type " + typeTag
+                                    + " in aggregation input stream. Expected type " + aggType + ".");
+                        }
+                        ++count;
+                        switch (typeTag) {
+                            case INT8: {
+                                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
                             }
-                            inputVal.reset();
+                            case INT16: {
+                                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case INT32: {
+                                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case INT64: {
+                                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case FLOAT: {
+                                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case DOUBLE: {
+                                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case NULL: {
+                                break;
+                            }
+                            default: {
+                                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+                            }
                         }
                         BufferSerDeUtil.writeDouble(sum, state, start);
                         BufferSerDeUtil.writeLong(count, state, start + 8);
-                        BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
+                        state[start + 16] = aggType.serialize();
                     }
 
                     @Override
                     public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
                         double sum = BufferSerDeUtil.getDouble(state, start);
                         long count = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-                        if (recordEval == null)
-                            recordEval = new ClosedRecordConstructorEval(recType,
-                                    new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, result);
+                        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
+                        if (recordEval == null) {
+                            recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
+                                    evalCount }, avgBytes, result);
+                        }
                         try {
                             if (count == 0) {
                                 result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
                                 return;
                             }
-                            if (metNull) {
+                            if (aggType == ATypeTag.NULL) {
                                 sumBytes.reset();
                                 nullSerde.serialize(ANull.NULL, sumBytesOutput);
                             } else {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
index c244d89..c027716 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -65,16 +65,14 @@
         inputVal.reset();
         eval.evaluate(tuple);
         ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
-        if (typeTag == ATypeTag.NULL) {
+        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
             aggType = ATypeTag.NULL;
-        }
-        if (aggType == ATypeTag.NULL) {
             return;
         } else if (aggType == ATypeTag.SYSTEM_NULL) {
             aggType = typeTag;
         } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
             throw new AlgebricksException("Unexpected type " + typeTag
-                    + " in sum-aggregation input stream. Expected type " + aggType + ".");
+                    + " in aggregation input stream. Expected type " + aggType + ".");
         }
         switch (typeTag) {
             case INT8: {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
index bbd0460..5376e57 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
@@ -39,7 +39,6 @@
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
@@ -80,9 +79,10 @@
 
                     private DataOutput out = provider.getDataOutput();
                     private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
+                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);                    
                     private double sum;
                     private int count;
+                    private ATypeTag aggType;
                     private AMutableDouble aDouble = new AMutableDouble(0);
                     private AMutableInt32 aInt32 = new AMutableInt32(0);
 
@@ -105,10 +105,10 @@
                     @SuppressWarnings("unchecked")
                     private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
                             .getSerializerDeserializer(BuiltinType.ANULL);
-                    private boolean metNull;
 
                     @Override
                     public void init() throws AlgebricksException {
+                        aggType = ATypeTag.SYSTEM_NULL;
                         sum = 0.0;
                         count = 0;
                     }
@@ -117,74 +117,69 @@
                     public void step(IFrameTupleReference tuple) throws AlgebricksException {
                         inputVal.reset();
                         eval.evaluate(tuple);
-                        if (inputVal.getLength() > 0) {
-                            ++count;
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
-                                    .getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT16: {
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT32: {
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT64: {
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute AVG for values of type "
-                                            + typeTag);
-                                }
+                        ++count;
+                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                                .deserialize(inputVal.getByteArray()[0]);
+                        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+                            aggType = ATypeTag.NULL;
+                            return;
+                        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+                            aggType = typeTag;
+                        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
+                            throw new AlgebricksException("Unexpected type " + typeTag
+                                    + " in aggregation input stream. Expected type " + aggType + ".");
+                        }
+                        switch (typeTag) {
+                            case INT8: {
+                                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
                             }
-                            inputVal.reset();
+                            case INT16: {
+                                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case INT32: {
+                                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case INT64: {
+                                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case FLOAT: {
+                                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case DOUBLE: {
+                                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case NULL: {
+                                break;
+                            }
+                            default: {
+                                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+                            }
                         }
                     }
 
                     @Override
                     public void finish() throws AlgebricksException {
-                        if (count == 0) {
-                            GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
-                            try {
+                        try {
+                            if (count == 0 || aggType == ATypeTag.NULL) {
                                 nullSerde.serialize(ANull.NULL, out);
-                            } catch (HyracksDataException e) {
-                                throw new AlgebricksException(e);
+                            } else {
+                                aDouble.setValue(sum / count);
+                                doubleSerde.serialize(aDouble, out);
                             }
-                        } else {
-                            try {
-                                if (metNull)
-                                    nullSerde.serialize(ANull.NULL, out);
-                                else {
-                                    aDouble.setValue(sum / count);
-                                    doubleSerde.serialize(aDouble, out);
-                                }
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
-                            }
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
                         }
                     }
 
@@ -196,7 +191,7 @@
                             }
                         } else {
                             try {
-                                if (metNull) {
+                                if (aggType == ATypeTag.NULL) {
                                     sumBytes.reset();
                                     nullSerde.serialize(ANull.NULL, sumBytesOutput);
                                 } else {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
index 4870745..fd83725 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
@@ -68,13 +68,14 @@
         if (typeTag == ATypeTag.NULL) {
             aggType = ATypeTag.NULL;
         }
-        if (aggType == ATypeTag.NULL) {
+        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+            aggType = ATypeTag.NULL;
             return;
         } else if (aggType == ATypeTag.SYSTEM_NULL) {
             aggType = typeTag;
         } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
             throw new AlgebricksException("Unexpected type " + typeTag
-                    + " in sum-aggregation input stream. Expected type " + aggType + ".");
+                    + " in aggregation input stream. Expected type " + aggType + ".");
         }
         switch (typeTag) {
             case INT8: {