Minor fixes to avg agg functions.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_fix_agg@615 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/resources/runtimets/queries/tpch/q1_pricing_summary_report_nt.aql b/asterix-app/src/test/resources/runtimets/queries/tpch/q1_pricing_summary_report_nt.aql
index b53fc877..af39b3f 100644
--- a/asterix-app/src/test/resources/runtimets/queries/tpch/q1_pricing_summary_report_nt.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/tpch/q1_pricing_summary_report_nt.aql
@@ -1,7 +1,5 @@
drop dataverse tpch if exists;
create dataverse tpch;
-
-
use dataverse tpch;
create type LineItemType as closed {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
index 99d28ae..3dea2e1 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
@@ -59,8 +59,8 @@
}
@Override
- public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(ICopyEvaluatorFactory[] args)
- throws AlgebricksException {
+ public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+ ICopyEvaluatorFactory[] args) throws AlgebricksException {
final ICopyEvaluatorFactory[] evals = args;
List<IAType> unionList = new ArrayList<IAType>();
unionList.add(BuiltinType.ANULL);
@@ -103,7 +103,7 @@
try {
state.writeDouble(0.0);
state.writeLong(0);
- state.writeBoolean(false);
+ state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
} catch (IOException e) {
throw new AlgebricksException(e);
}
@@ -116,72 +116,77 @@
eval.evaluate(tuple);
double sum = BufferSerDeUtil.getDouble(state, start);
long count = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
- if (inputVal.getLength() > 0) {
- ++count;
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
- .deserialize(inputVal.getByteArray()[0]);
- switch (typeTag) {
- case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- metNull = true;
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute AVG for values of type "
- + typeTag);
- }
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(inputVal.getByteArray()[0]);
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
+ if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+ aggType = ATypeTag.NULL;
+ return;
+ } else if (aggType == ATypeTag.SYSTEM_NULL) {
+ aggType = typeTag;
+ } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
+ throw new AlgebricksException("Unexpected type " + typeTag
+ + " in aggregation input stream. Expected type " + aggType + ".");
+ }
+ ++count;
+ switch (typeTag) {
+ case INT8: {
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
}
- inputVal.reset();
+ case INT16: {
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case NULL: {
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+ }
}
BufferSerDeUtil.writeDouble(sum, state, start);
BufferSerDeUtil.writeLong(count, state, start + 8);
- BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
+ state[start + 16] = aggType.serialize();
}
@Override
public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
double sum = BufferSerDeUtil.getDouble(state, start);
long count = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
- if (recordEval == null)
- recordEval = new ClosedRecordConstructorEval(recType,
- new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, result);
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
+ if (recordEval == null) {
+ recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
+ evalCount }, avgBytes, result);
+ }
try {
if (count == 0) {
result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
return;
}
- if (metNull) {
+ if (aggType == ATypeTag.NULL) {
sumBytes.reset();
nullSerde.serialize(ANull.NULL, sumBytesOutput);
} else {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
index c244d89..c027716 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -65,16 +65,14 @@
inputVal.reset();
eval.evaluate(tuple);
ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
- if (typeTag == ATypeTag.NULL) {
+ if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
aggType = ATypeTag.NULL;
- }
- if (aggType == ATypeTag.NULL) {
return;
} else if (aggType == ATypeTag.SYSTEM_NULL) {
aggType = typeTag;
} else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
throw new AlgebricksException("Unexpected type " + typeTag
- + " in sum-aggregation input stream. Expected type " + aggType + ".");
+ + " in aggregation input stream. Expected type " + aggType + ".");
}
switch (typeTag) {
case INT8: {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
index bbd0460..5376e57 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
@@ -39,7 +39,6 @@
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
@@ -80,9 +79,10 @@
private DataOutput out = provider.getDataOutput();
private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
+ private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
private double sum;
private int count;
+ private ATypeTag aggType;
private AMutableDouble aDouble = new AMutableDouble(0);
private AMutableInt32 aInt32 = new AMutableInt32(0);
@@ -105,10 +105,10 @@
@SuppressWarnings("unchecked")
private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ANULL);
- private boolean metNull;
@Override
public void init() throws AlgebricksException {
+ aggType = ATypeTag.SYSTEM_NULL;
sum = 0.0;
count = 0;
}
@@ -117,74 +117,69 @@
public void step(IFrameTupleReference tuple) throws AlgebricksException {
inputVal.reset();
eval.evaluate(tuple);
- if (inputVal.getLength() > 0) {
- ++count;
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
- .getByteArray()[0]);
- switch (typeTag) {
- case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- metNull = true;
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute AVG for values of type "
- + typeTag);
- }
+ ++count;
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(inputVal.getByteArray()[0]);
+ if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+ aggType = ATypeTag.NULL;
+ return;
+ } else if (aggType == ATypeTag.SYSTEM_NULL) {
+ aggType = typeTag;
+ } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
+ throw new AlgebricksException("Unexpected type " + typeTag
+ + " in aggregation input stream. Expected type " + aggType + ".");
+ }
+ switch (typeTag) {
+ case INT8: {
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
}
- inputVal.reset();
+ case INT16: {
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case NULL: {
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+ }
}
}
@Override
public void finish() throws AlgebricksException {
- if (count == 0) {
- GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
- try {
+ try {
+ if (count == 0 || aggType == ATypeTag.NULL) {
nullSerde.serialize(ANull.NULL, out);
- } catch (HyracksDataException e) {
- throw new AlgebricksException(e);
+ } else {
+ aDouble.setValue(sum / count);
+ doubleSerde.serialize(aDouble, out);
}
- } else {
- try {
- if (metNull)
- nullSerde.serialize(ANull.NULL, out);
- else {
- aDouble.setValue(sum / count);
- doubleSerde.serialize(aDouble, out);
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
}
}
@@ -196,7 +191,7 @@
}
} else {
try {
- if (metNull) {
+ if (aggType == ATypeTag.NULL) {
sumBytes.reset();
nullSerde.serialize(ANull.NULL, sumBytesOutput);
} else {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
index 4870745..fd83725 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
@@ -68,13 +68,14 @@
if (typeTag == ATypeTag.NULL) {
aggType = ATypeTag.NULL;
}
- if (aggType == ATypeTag.NULL) {
+ if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+ aggType = ATypeTag.NULL;
return;
} else if (aggType == ATypeTag.SYSTEM_NULL) {
aggType = typeTag;
} else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
throw new AlgebricksException("Unexpected type " + typeTag
- + " in sum-aggregation input stream. Expected type " + aggType + ".");
+ + " in aggregation input stream. Expected type " + aggType + ".");
}
switch (typeTag) {
case INT8: {