Changed avg aggregate functions to deal with system null. All aggregate functions now work properly for empty inputs, with and without local aggregation. Enabled tests.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_fix_agg@530 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/resources/runtimets/ignore.txt b/asterix-app/src/test/resources/runtimets/ignore.txt
index 43701fa..1ee67a3 100644
--- a/asterix-app/src/test/resources/runtimets/ignore.txt
+++ b/asterix-app/src/test/resources/runtimets/ignore.txt
@@ -36,5 +36,4 @@
quantifiers/somesat_04.aql
quantifiers/somesat_05.aql
quantifiers/everysat_02.aql
-quantifiers/everysat_03.aql
-aggregate/avg_empty_02.aql
+quantifiers/everysat_03.aql
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
index e4c4931..06fc5fd 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
@@ -6,7 +6,6 @@
import java.util.ArrayList;
import java.util.List;
-import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.functions.FunctionConstants;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
@@ -44,8 +43,6 @@
private static final long serialVersionUID = 1L;
public final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"global-avg-serial", 1);
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
- private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
public IFunctionDescriptor createFunctionDescriptor() {
return new SerializableGlobalAvgAggregateDescriptor();
@@ -119,12 +116,25 @@
inputVal.reset();
eval.evaluate(tuple);
byte[] serBytes = inputVal.getByteArray();
- if (serBytes[0] == SER_NULL_TYPE_TAG)
- metNull = true;
- if (serBytes[0] != SER_RECORD_TYPE_TAG) {
- throw new AlgebricksException("Global-Avg is not defined for values of type "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
- }
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+ switch (typeTag) {
+ case NULL: {
+ metNull = true;
+ break;
+ }
+ case SYSTEM_NULL: {
+ // Ignore and return.
+ return;
+ }
+ case RECORD: {
+ // Expected.
+ break;
+ }
+ default: {
+ throw new AlgebricksException("Global-Avg is not defined for values of type "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+ }
+ }
int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, 1, true);
if (offset1 == 0) // the sum is null
metNull = true;
@@ -146,19 +156,15 @@
long globalCount = BufferSerDeUtil.getLong(state, start + 8);
boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
- if (globalCount == 0) {
- GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
- } else {
- try {
- if (metNull)
- nullSerde.serialize(ANull.NULL, result);
- else {
- aDouble.setValue(globalSum / globalCount);
- doubleSerde.serialize(aDouble, result);
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
+ try {
+ if (globalCount == 0 || metNull)
+ nullSerde.serialize(ANull.NULL, result);
+ else {
+ aDouble.setValue(globalSum / globalCount);
+ doubleSerde.serialize(aDouble, result);
}
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
}
}
@@ -173,25 +179,21 @@
recordEval = new ClosedRecordConstructorEval(recType,
new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, result);
- if (globalCount == 0) {
- GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
- } else {
- try {
- if (metNull) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(globalSum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt64.setValue(globalCount);
- longSerde.serialize(aInt64, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
+ try {
+ if (globalCount == 0 || metNull) {
+ sumBytes.reset();
+ nullSerde.serialize(ANull.NULL, sumBytesOutput);
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(globalSum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
}
+ countBytes.reset();
+ aInt64.setValue(globalCount);
+ longSerde.serialize(aInt64, countBytesOutput);
+ recordEval.evaluate(null);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
}
}
};
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
index f5c9538..5cef25c 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
@@ -6,7 +6,6 @@
import java.util.ArrayList;
import java.util.List;
-import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.functions.FunctionConstants;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
@@ -177,27 +176,25 @@
if (recordEval == null)
recordEval = new ClosedRecordConstructorEval(recType,
new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, result);
- if (count == 0) {
- if (GlobalConfig.DEBUG) {
- GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+ try {
+ if (count == 0) {
+ result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ return;
}
- } else {
- try {
- if (metNull) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(sum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt64.setValue(count);
- int64Serde.serialize(aInt64, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
+ if (metNull) {
+ sumBytes.reset();
+ nullSerde.serialize(ANull.NULL, sumBytesOutput);
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(sum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
}
+ countBytes.reset();
+ aInt64.setValue(count);
+ int64Serde.serialize(aInt64, countBytesOutput);
+ recordEval.evaluate(null);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
}
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
index e86b2bc..1e4516e 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
@@ -6,7 +6,6 @@
import java.util.ArrayList;
import java.util.List;
-import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.functions.FunctionConstants;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
@@ -45,8 +44,6 @@
private static final long serialVersionUID = 1L;
public final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-global-avg",
1);
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
- private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
public IFunctionDescriptor createFunctionDescriptor() {
return new GlobalAvgAggregateDescriptor();
@@ -117,11 +114,24 @@
inputVal.reset();
eval.evaluate(tuple);
byte[] serBytes = inputVal.getByteArray();
- if (serBytes[0] == SER_NULL_TYPE_TAG)
- metNull = true;
- if (serBytes[0] != SER_RECORD_TYPE_TAG) {
- throw new AlgebricksException("Global-Avg is not defined for values of type "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+ switch (typeTag) {
+ case NULL: {
+ metNull = true;
+ break;
+ }
+ case SYSTEM_NULL: {
+ // Ignore and return.
+ return;
+ }
+ case RECORD: {
+ // Expected.
+ break;
+ }
+ default: {
+ throw new AlgebricksException("Global-Avg is not defined for values of type "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+ }
}
int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, 1, false);
if (offset1 == 0) // the sum is null
@@ -136,43 +146,35 @@
@Override
public void finish() throws AlgebricksException {
- if (globalCount == 0) {
- GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
- } else {
- try {
- if (metNull)
- nullSerde.serialize(ANull.NULL, out);
- else {
- aDouble.setValue(globalSum / globalCount);
- doubleSerde.serialize(aDouble, out);
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
+ try {
+ if (globalCount == 0 || metNull)
+ nullSerde.serialize(ANull.NULL, out);
+ else {
+ aDouble.setValue(globalSum / globalCount);
+ doubleSerde.serialize(aDouble, out);
}
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
}
}
@Override
public void finishPartial() throws AlgebricksException {
- if (globalCount == 0) {
- GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
- } else {
- try {
- if (metNull) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(globalSum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt32.setValue(globalCount);
- intSerde.serialize(aInt32, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
+ try {
+ if (metNull || globalCount == 0) {
+ sumBytes.reset();
+ nullSerde.serialize(ANull.NULL, sumBytesOutput);
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(globalSum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
}
+ countBytes.reset();
+ aInt32.setValue(globalCount);
+ intSerde.serialize(aInt32, countBytesOutput);
+ recordEval.evaluate(null);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
}
}
};
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
index fa05d4b..2ef543f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
@@ -6,7 +6,6 @@
import java.util.ArrayList;
import java.util.List;
-import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.functions.FunctionConstants;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
@@ -168,27 +167,25 @@
@Override
public void finish() throws AlgebricksException {
- if (count == 0) {
- if (GlobalConfig.DEBUG) {
- GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+ try {
+ if (count == 0) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ return;
}
- } else {
- try {
- if (metNull) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(sum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt32.setValue(count);
- int32Serde.serialize(aInt32, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
+ if (metNull) {
+ sumBytes.reset();
+ nullSerde.serialize(ANull.NULL, sumBytesOutput);
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(sum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
}
+ countBytes.reset();
+ aInt32.setValue(count);
+ int32Serde.serialize(aInt32, countBytesOutput);
+ recordEval.evaluate(null);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
}
}