Changed avg aggregate functions to deal with system null. All aggregate functions now work properly for empty inputs, with and without local aggregation. Enabled tests.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_fix_agg@530 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/resources/runtimets/ignore.txt b/asterix-app/src/test/resources/runtimets/ignore.txt
index 43701fa..1ee67a3 100644
--- a/asterix-app/src/test/resources/runtimets/ignore.txt
+++ b/asterix-app/src/test/resources/runtimets/ignore.txt
@@ -36,5 +36,4 @@
 quantifiers/somesat_04.aql
 quantifiers/somesat_05.aql
 quantifiers/everysat_02.aql
-quantifiers/everysat_03.aql
-aggregate/avg_empty_02.aql
+quantifiers/everysat_03.aql
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
index e4c4931..06fc5fd 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
@@ -6,7 +6,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
@@ -44,8 +43,6 @@
     private static final long serialVersionUID = 1L;
     public final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "global-avg-serial", 1);
-    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
-    private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableGlobalAvgAggregateDescriptor();
@@ -119,12 +116,25 @@
                         inputVal.reset();
                         eval.evaluate(tuple);
                         byte[] serBytes = inputVal.getByteArray();
-                        if (serBytes[0] == SER_NULL_TYPE_TAG)
-                            metNull = true;
-                        if (serBytes[0] != SER_RECORD_TYPE_TAG) {
-                            throw new AlgebricksException("Global-Avg is not defined for values of type "
-                                    + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
-                        }
+                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);                        
+                        switch (typeTag) {
+                            case NULL: {
+                                metNull = true;
+                                break;
+                            }
+                            case SYSTEM_NULL: {
+                                // Ignore and return.
+                                return;
+                            }
+                            case RECORD: {
+                                // Expected.
+                                break;
+                            }
+                            default: {
+                                throw new AlgebricksException("Global-Avg is not defined for values of type "
+                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+                            }
+                        }                        
                         int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, 1, true);
                         if (offset1 == 0) // the sum is null
                             metNull = true;
@@ -146,19 +156,15 @@
                         long globalCount = BufferSerDeUtil.getLong(state, start + 8);
                         boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
 
-                        if (globalCount == 0) {
-                            GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
-                        } else {
-                            try {
-                                if (metNull)
-                                    nullSerde.serialize(ANull.NULL, result);
-                                else {
-                                    aDouble.setValue(globalSum / globalCount);
-                                    doubleSerde.serialize(aDouble, result);
-                                }
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
+                        try {
+                            if (globalCount == 0 || metNull)
+                                nullSerde.serialize(ANull.NULL, result);
+                            else {
+                                aDouble.setValue(globalSum / globalCount);
+                                doubleSerde.serialize(aDouble, result);
                             }
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
                         }
                     }
 
@@ -173,25 +179,21 @@
                             recordEval = new ClosedRecordConstructorEval(recType,
                                     new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, result);
 
-                        if (globalCount == 0) {
-                            GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
-                        } else {
-                            try {
-                                if (metNull) {
-                                    sumBytes.reset();
-                                    nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                                } else {
-                                    sumBytes.reset();
-                                    aDouble.setValue(globalSum);
-                                    doubleSerde.serialize(aDouble, sumBytesOutput);
-                                }
-                                countBytes.reset();
-                                aInt64.setValue(globalCount);
-                                longSerde.serialize(aInt64, countBytesOutput);
-                                recordEval.evaluate(null);
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
+                        try {
+                            if (globalCount == 0 || metNull) {
+                                sumBytes.reset();
+                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
+                            } else {
+                                sumBytes.reset();
+                                aDouble.setValue(globalSum);
+                                doubleSerde.serialize(aDouble, sumBytesOutput);
                             }
+                            countBytes.reset();
+                            aInt64.setValue(globalCount);
+                            longSerde.serialize(aInt64, countBytesOutput);
+                            recordEval.evaluate(null);
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
                         }
                     }
                 };
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
index f5c9538..5cef25c 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
@@ -6,7 +6,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
@@ -177,27 +176,25 @@
                         if (recordEval == null)
                             recordEval = new ClosedRecordConstructorEval(recType,
                                     new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, result);
-                        if (count == 0) {
-                            if (GlobalConfig.DEBUG) {
-                                GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+                        try {
+                            if (count == 0) {
+                                result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                                return;
                             }
-                        } else {
-                            try {
-                                if (metNull) {
-                                    sumBytes.reset();
-                                    nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                                } else {
-                                    sumBytes.reset();
-                                    aDouble.setValue(sum);
-                                    doubleSerde.serialize(aDouble, sumBytesOutput);
-                                }
-                                countBytes.reset();
-                                aInt64.setValue(count);
-                                int64Serde.serialize(aInt64, countBytesOutput);
-                                recordEval.evaluate(null);
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
+                            if (metNull) {
+                                sumBytes.reset();
+                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
+                            } else {
+                                sumBytes.reset();
+                                aDouble.setValue(sum);
+                                doubleSerde.serialize(aDouble, sumBytesOutput);
                             }
+                            countBytes.reset();
+                            aInt64.setValue(count);
+                            int64Serde.serialize(aInt64, countBytesOutput);
+                            recordEval.evaluate(null);
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
                         }
                     }
 
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
index e86b2bc..1e4516e 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
@@ -6,7 +6,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
@@ -45,8 +44,6 @@
     private static final long serialVersionUID = 1L;
     public final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-global-avg",
             1);
-    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
-    private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         public IFunctionDescriptor createFunctionDescriptor() {
             return new GlobalAvgAggregateDescriptor();
@@ -117,11 +114,24 @@
                         inputVal.reset();
                         eval.evaluate(tuple);
                         byte[] serBytes = inputVal.getByteArray();
-                        if (serBytes[0] == SER_NULL_TYPE_TAG)
-                            metNull = true;
-                        if (serBytes[0] != SER_RECORD_TYPE_TAG) {
-                            throw new AlgebricksException("Global-Avg is not defined for values of type "
-                                    + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);                        
+                        switch (typeTag) {
+                            case NULL: {
+                                metNull = true;
+                                break;
+                            }
+                            case SYSTEM_NULL: {
+                                // Ignore and return.
+                                return;
+                            }
+                            case RECORD: {
+                                // Expected.
+                                break;
+                            }
+                            default: {
+                                throw new AlgebricksException("Global-Avg is not defined for values of type "
+                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+                            }
                         }
                         int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, 1, false);
                         if (offset1 == 0) // the sum is null
@@ -136,43 +146,35 @@
 
                     @Override
                     public void finish() throws AlgebricksException {
-                        if (globalCount == 0) {
-                            GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
-                        } else {
-                            try {
-                                if (metNull)
-                                    nullSerde.serialize(ANull.NULL, out);
-                                else {
-                                    aDouble.setValue(globalSum / globalCount);
-                                    doubleSerde.serialize(aDouble, out);
-                                }
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
+                        try {
+                            if (globalCount == 0 || metNull)
+                                nullSerde.serialize(ANull.NULL, out);
+                            else {
+                                aDouble.setValue(globalSum / globalCount);
+                                doubleSerde.serialize(aDouble, out);
                             }
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
                         }
                     }
 
                     @Override
                     public void finishPartial() throws AlgebricksException {
-                        if (globalCount == 0) {
-                            GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
-                        } else {
-                            try {
-                                if (metNull) {
-                                    sumBytes.reset();
-                                    nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                                } else {
-                                    sumBytes.reset();
-                                    aDouble.setValue(globalSum);
-                                    doubleSerde.serialize(aDouble, sumBytesOutput);
-                                }
-                                countBytes.reset();
-                                aInt32.setValue(globalCount);
-                                intSerde.serialize(aInt32, countBytesOutput);
-                                recordEval.evaluate(null);
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
+                        try {
+                            if (metNull || globalCount == 0) {
+                                sumBytes.reset();
+                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
+                            } else {
+                                sumBytes.reset();
+                                aDouble.setValue(globalSum);
+                                doubleSerde.serialize(aDouble, sumBytesOutput);
                             }
+                            countBytes.reset();
+                            aInt32.setValue(globalCount);
+                            intSerde.serialize(aInt32, countBytesOutput);
+                            recordEval.evaluate(null);
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
                         }
                     }
                 };
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
index fa05d4b..2ef543f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
@@ -6,7 +6,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
@@ -168,27 +167,25 @@
 
                     @Override
                     public void finish() throws AlgebricksException {
-                        if (count == 0) {
-                            if (GlobalConfig.DEBUG) {
-                                GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+                        try {
+                            if (count == 0) {
+                                out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                                return;
                             }
-                        } else {
-                            try {
-                                if (metNull) {
-                                    sumBytes.reset();
-                                    nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                                } else {
-                                    sumBytes.reset();
-                                    aDouble.setValue(sum);
-                                    doubleSerde.serialize(aDouble, sumBytesOutput);
-                                }
-                                countBytes.reset();
-                                aInt32.setValue(count);
-                                int32Serde.serialize(aInt32, countBytesOutput);
-                                recordEval.evaluate(null);
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
+                            if (metNull) {
+                                sumBytes.reset();
+                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
+                            } else {
+                                sumBytes.reset();
+                                aDouble.setValue(sum);
+                                doubleSerde.serialize(aDouble, sumBytesOutput);
                             }
+                            countBytes.reset();
+                            aInt32.setValue(count);
+                            int32Serde.serialize(aInt32, countBytesOutput);
+                            recordEval.evaluate(null);
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
                         }
                     }