Reimplemented min/max aggregate functions to support all totally ordered types that have an IBinaryComparator implementation. Added tests.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_fix_agg@619 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max.aql
index 1d044f8..8eefced 100644
--- a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max.aql
@@ -15,5 +15,7 @@
let $i64 := max([int64("1"), int64("2"), int64("3")])
let $f := max([float("1"), float("2"), float("3")])
let $d := max([double("1"), double("2"), double("3")])
-for $i in [$i8, $i16, $i32, $i64, $f, $d]
+let $s := max(["foo", "bar", "world"])
+let $dt := max([datetime("2012-03-01T00:00:00Z"), datetime("2012-01-01T00:00:00Z"), datetime("2012-02-01T00:00:00Z")])
+for $i in [$i8, $i16, $i32, $i64, $f, $d, $s, $dt]
return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max_null.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max_null.aql
index 7dcd368..04436bf 100644
--- a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max_null.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max_null.aql
@@ -15,5 +15,7 @@
let $i64 := max([int64("1"), int64("2"), int64("3"), null])
let $f := max([float("1"), float("2"), float("3"), null])
let $d := max([double("1"), double("2"), double("3"), null])
-for $i in [$i8, $i16, $i32, $i64, $f, $d]
+let $s := max(["foo", "bar", "world", null])
+let $dt := min([datetime("2012-03-01T00:00:00Z"), datetime("2012-01-01T00:00:00Z"), datetime("2012-02-01T00:00:00Z"), null])
+for $i in [$i8, $i16, $i32, $i64, $f, $d, $s, $dt]
return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min.aql
index 3089758..04aa735 100644
--- a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min.aql
@@ -15,5 +15,7 @@
let $i64 := min([int64("1"), int64("2"), int64("3")])
let $f := min([float("1"), float("2"), float("3")])
let $d := min([double("1"), double("2"), double("3")])
-for $i in [$i8, $i16, $i32, $i64, $f, $d]
+let $s := min(["foo", "bar", "world"])
+let $dt := min([datetime("2012-03-01T00:00:00Z"), datetime("2012-01-01T00:00:00Z"), datetime("2012-02-01T00:00:00Z")])
+for $i in [$i8, $i16, $i32, $i64, $f, $d, $s, $dt]
return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min_null.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min_null.aql
index 8601228..523ca26 100644
--- a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min_null.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min_null.aql
@@ -15,5 +15,7 @@
let $i64 := min([int64("1"), int64("2"), int64("3"), null])
let $f := min([float("1"), float("2"), float("3"), null])
let $d := min([double("1"), double("2"), double("3"), null])
-for $i in [$i8, $i16, $i32, $i64, $f, $d]
+let $s := min(["foo", "bar", "world", null])
+let $dt := min([datetime("2012-03-01T00:00:00Z"), datetime("2012-01-01T00:00:00Z"), datetime("2012-02-01T00:00:00Z"), null])
+for $i in [$i8, $i16, $i32, $i64, $f, $d, $s, $dt]
return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_double_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_double_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_double_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_double_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_float_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_float_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_float_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_float_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int16_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int16_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int16_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int16_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int32_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int32_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int32_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int32_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int64_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int64_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int64_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int64_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int8_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int8_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int8_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int8_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max.adm
index a2a1efe..d420c2f 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max.adm
@@ -3,4 +3,6 @@
3
3i64
3.0f
-3.0d
\ No newline at end of file
+3.0d
+"world"
+datetime("2012-03-01T00:00:00.000Z")
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max_null.adm
index 0800a91..c9f3cb3 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max_null.adm
@@ -3,4 +3,6 @@
null
null
null
+null
+null
null
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min.adm
index ebfec4e..6acf213 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min.adm
@@ -3,4 +3,6 @@
1
1i64
1.0f
-1.0d
\ No newline at end of file
+1.0d
+"bar"
+datetime("2012-01-01T00:00:00.000Z")
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min_null.adm
index 0800a91..c9f3cb3 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min_null.adm
@@ -3,4 +3,6 @@
null
null
null
+null
+null
null
\ No newline at end of file
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlBinaryComparatorFactoryProvider.java b/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlBinaryComparatorFactoryProvider.java
index 93167cf..e4db8da 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlBinaryComparatorFactoryProvider.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlBinaryComparatorFactoryProvider.java
@@ -6,7 +6,6 @@
import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.AObjectDescBinaryComparatorFactory;
import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.BooleanBinaryComparatorFactory;
-import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.LongBinaryComparatorFactory;
import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.RectangleBinaryComparatorFactory;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
@@ -15,17 +14,26 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.BytePointable;
import edu.uci.ics.hyracks.data.std.primitive.DoublePointable;
import edu.uci.ics.hyracks.data.std.primitive.FloatPointable;
import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
+import edu.uci.ics.hyracks.data.std.primitive.ShortPointable;
import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
public class AqlBinaryComparatorFactoryProvider implements IBinaryComparatorFactoryProvider, Serializable {
private static final long serialVersionUID = 1L;
public static final AqlBinaryComparatorFactoryProvider INSTANCE = new AqlBinaryComparatorFactoryProvider();
+ public static final PointableBinaryComparatorFactory BYTE_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
+ BytePointable.FACTORY);
+ public static final PointableBinaryComparatorFactory SHORT_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
+ ShortPointable.FACTORY);
public static final PointableBinaryComparatorFactory INTEGER_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
IntegerPointable.FACTORY);
+ public static final PointableBinaryComparatorFactory LONG_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
+ LongPointable.FACTORY);
public static final PointableBinaryComparatorFactory FLOAT_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
FloatPointable.FACTORY);
public static final PointableBinaryComparatorFactory DOUBLE_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
@@ -58,7 +66,11 @@
return anyBinaryComparatorFactory(ascending);
}
IAType aqlType = (IAType) type;
- switch (aqlType.getTypeTag()) {
+ return getBinaryComparatorFactory(aqlType.getTypeTag(), ascending);
+ }
+
+ public IBinaryComparatorFactory getBinaryComparatorFactory(ATypeTag type, boolean ascending) {
+ switch (type) {
case ANY:
case UNION: { // we could do smth better for nullable fields
return anyBinaryComparatorFactory(ascending);
@@ -83,11 +95,17 @@
case BOOLEAN: {
return addOffset(BooleanBinaryComparatorFactory.INSTANCE, ascending);
}
+ case INT8: {
+ return addOffset(BYTE_POINTABLE_INSTANCE, ascending);
+ }
+ case INT16: {
+ return addOffset(SHORT_POINTABLE_INSTANCE, ascending);
+ }
case INT32: {
return addOffset(INTEGER_POINTABLE_INSTANCE, ascending);
}
case INT64: {
- return addOffset(LongBinaryComparatorFactory.INSTANCE, ascending);
+ return addOffset(LONG_POINTABLE_INSTANCE, ascending);
}
case FLOAT: {
return addOffset(FLOAT_POINTABLE_INSTANCE, ascending);
@@ -101,12 +119,14 @@
case RECTANGLE: {
return addOffset(RectangleBinaryComparatorFactory.INSTANCE, ascending);
}
+ case DATE:
+ case TIME:
case DATETIME: {
- return addOffset(ADateTimeAscBinaryComparatorFactory.INSTANCE, ascending);
+ return addOffset(ADateTimeAscBinaryComparatorFactory.INSTANCE, ascending);
}
default: {
throw new NotImplementedException("No binary comparator factory implemented for type "
- + aqlType.getTypeTag() + " .");
+ + type + " .");
}
}
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
index 5376e57..08bb063 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
@@ -116,19 +116,21 @@
@Override
public void step(IFrameTupleReference tuple) throws AlgebricksException {
inputVal.reset();
- eval.evaluate(tuple);
- ++count;
+ eval.evaluate(tuple);
ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
.deserialize(inputVal.getByteArray()[0]);
if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
aggType = ATypeTag.NULL;
return;
- } else if (aggType == ATypeTag.SYSTEM_NULL) {
+ } else if (aggType == ATypeTag.SYSTEM_NULL) {
aggType = typeTag;
} else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
throw new AlgebricksException("Unexpected type " + typeTag
+ " in aggregation input stream. Expected type " + aggType + ".");
}
+ if (typeTag != ATypeTag.SYSTEM_NULL) {
+ ++count;
+ }
switch (typeTag) {
case INT8: {
byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
index b629c32..02d8c91 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
@@ -81,6 +81,7 @@
private DataOutput out = provider.getDataOutput();
private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
+ private ATypeTag aggType;
private double sum;
private int count;
@@ -104,65 +105,72 @@
.getSerializerDeserializer(BuiltinType.ANULL);
private AMutableDouble aDouble = new AMutableDouble(0);
private AMutableInt32 aInt32 = new AMutableInt32(0);
- private boolean metNull;
@Override
public void init() {
+ aggType = ATypeTag.SYSTEM_NULL;
sum = 0.0;
count = 0;
- metNull = false;
}
@Override
public void step(IFrameTupleReference tuple) throws AlgebricksException {
inputVal.reset();
eval.evaluate(tuple);
- if (inputVal.getLength() > 0) {
- ++count;
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
- .getByteArray()[0]);
- switch (typeTag) {
- case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- metNull = true;
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute LOCAL-AVG for values of type "
- + typeTag);
- }
- }
- inputVal.reset();
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(inputVal.getByteArray()[0]);
+ if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+ aggType = ATypeTag.NULL;
+ return;
+ } else if (aggType == ATypeTag.SYSTEM_NULL) {
+ aggType = typeTag;
+ } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
+ throw new AlgebricksException("Unexpected type " + typeTag
+ + " in aggregation input stream. Expected type " + aggType + ".");
}
+ if (typeTag != ATypeTag.SYSTEM_NULL) {
+ ++count;
+ }
+ switch (typeTag) {
+ case INT8: {
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT16: {
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case NULL: {
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute LOCAL-AVG for values of type "
+ + typeTag);
+ }
+ }
+ inputVal.reset();
}
@Override
@@ -172,7 +180,7 @@
out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
return;
}
- if (metNull) {
+ if (aggType == ATypeTag.NULL) {
sumBytes.reset();
nullSerde.serialize(ANull.NULL, sumBytesOutput);
} else {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java
index 4d234f3..59287d5 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java
@@ -36,7 +36,7 @@
@Override
public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
throws AlgebricksException {
- return new MaxAggregateFunction(args, provider, true);
+ return new MinMaxAggregateFunction(args, provider, false, true);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
index f3c8ea6..ca32e2f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
@@ -36,7 +36,7 @@
@Override
public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
throws AlgebricksException {
- return new MinAggregateFunction(args, provider, true);
+ return new MinMaxAggregateFunction(args, provider, true, true);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
index e59c19d..18bfe9f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
@@ -35,7 +35,7 @@
@Override
public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
throws AlgebricksException {
- return new MaxAggregateFunction(args, provider, false);
+ return new MinMaxAggregateFunction(args, provider, false, false);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateFunction.java
deleted file mode 100644
index df244e1..0000000
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateFunction.java
+++ /dev/null
@@ -1,208 +0,0 @@
-package edu.uci.ics.asterix.runtime.aggregates.std;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
-import edu.uci.ics.asterix.om.base.ANull;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public class MaxAggregateFunction implements ICopyAggregateFunction {
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private DataOutput out;
- private ICopyEvaluator eval;
- private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
-
- private byte byteVal = Byte.MIN_VALUE;
- private short shortVal = Short.MIN_VALUE;
- private int intVal = Integer.MIN_VALUE;
- private long longVal = Long.MIN_VALUE;
- private float floatVal = Float.MIN_VALUE;
- private double doubleVal = Double.MIN_VALUE;
-
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableFloat aFloat = new AMutableFloat(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
- private AMutableInt32 aInt32 = new AMutableInt32(0);
- private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
- private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
- @SuppressWarnings("rawtypes")
- private ISerializerDeserializer serde;
- private final boolean isLocalAgg;
-
- public MaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
- throws AlgebricksException {
- out = provider.getDataOutput();
- eval = args[0].createEvaluator(inputVal);
- this.isLocalAgg = isLocalAgg;
- }
-
- @Override
- public void init() {
- byteVal = Byte.MIN_VALUE;
- shortVal = Short.MIN_VALUE;
- intVal = Integer.MIN_VALUE;
- longVal = Long.MIN_VALUE;
- floatVal = Float.MIN_VALUE;
- doubleVal = Double.MIN_VALUE;
-
- metInt8s = false;
- metInt16s = false;
- metInt32s = false;
- metInt64s = false;
- metFloats = false;
- metDoubles = false;
- metNull = false;
- }
-
- @Override
- public void step(IFrameTupleReference tuple) throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- if (inputVal.getLength() > 0) {
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
- .getByteArray()[0]);
- switch (typeTag) {
- case INT8: {
- metInt8s = true;
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- if (val > byteVal)
- byteVal = val;
- break;
- }
- case INT16: {
- metInt16s = true;
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- if (val > shortVal)
- shortVal = val;
- break;
- }
- case INT32: {
- metInt32s = true;
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- if (val > intVal)
- intVal = val;
- break;
- }
- case INT64: {
- metInt64s = true;
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- if (val > longVal)
- longVal = val;
- break;
- }
- case FLOAT: {
- metFloats = true;
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- if (val > floatVal)
- floatVal = val;
- break;
- }
- case DOUBLE: {
- metDoubles = true;
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- if (val > doubleVal)
- doubleVal = val;
- break;
- }
- case NULL: {
- metNull = true;
- break;
- }
- case SYSTEM_NULL: {
- // For global aggregates simply ignore system null here,
- // but if all input value are system null, then we should return
- // null in finish().
- if (isLocalAgg) {
- throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
- }
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute SUM for values of type "
- + typeTag);
- }
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void finish() throws AlgebricksException {
- try {
- if (metNull) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- } else if (metDoubles) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- aDouble.setValue(doubleVal);
- serde.serialize(aDouble, out);
- } else if (metFloats) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AFLOAT);
- aFloat.setValue(floatVal);
- serde.serialize(aFloat, out);
- } else if (metInt64s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- aInt64.setValue(longVal);
- serde.serialize(aInt64, out);
- } else if (metInt32s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT32);
- aInt32.setValue(intVal);
- serde.serialize(aInt32, out);
- } else if (metInt16s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT16);
- aInt16.setValue(shortVal);
- serde.serialize(aInt16, out);
- } else if (metInt8s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT8);
- aInt8.setValue(byteVal);
- serde.serialize(aInt8, out);
- } else {
- // Empty stream. For local agg return system null. For global agg return null.
- if (isLocalAgg) {
- out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- } else {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- }
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void finishPartial() throws AlgebricksException {
- finish();
- }
-}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
index fd9fc0d..0bf6ddc 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
@@ -35,7 +35,7 @@
@Override
public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
throws AlgebricksException {
- return new MinAggregateFunction(args, provider, false);
+ return new MinMaxAggregateFunction(args, provider, true, false);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateFunction.java
deleted file mode 100644
index 6581d07..0000000
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateFunction.java
+++ /dev/null
@@ -1,213 +0,0 @@
-package edu.uci.ics.asterix.runtime.aggregates.std;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
-import edu.uci.ics.asterix.om.base.ANull;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
-public class MinAggregateFunction implements ICopyAggregateFunction {
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private DataOutput out;
- private ICopyEvaluator eval;
- private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats,
- metDoubles, metNull;
- private byte byteVal = Byte.MAX_VALUE;
- private short shortVal = Short.MAX_VALUE;
- private int intVal = Integer.MAX_VALUE;
- private long longVal = Long.MAX_VALUE;
- private float floatVal = Float.MAX_VALUE;
- private double doubleVal = Double.MAX_VALUE;
-
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableFloat aFloat = new AMutableFloat(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
- private AMutableInt32 aInt32 = new AMutableInt32(0);
- private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
- private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
- @SuppressWarnings("rawtypes")
- private ISerializerDeserializer serde;
- private final boolean isLocalAgg;
-
- public MinAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
- throws AlgebricksException {
- out = provider.getDataOutput();
- eval = args[0].createEvaluator(inputVal);
- this.isLocalAgg = isLocalAgg;
- }
-
- @Override
- public void init() {
- byteVal = Byte.MAX_VALUE;
- shortVal = Short.MAX_VALUE;
- intVal = Integer.MAX_VALUE;
- longVal = Long.MAX_VALUE;
- floatVal = Float.MAX_VALUE;
- doubleVal = Double.MAX_VALUE;
-
- metInt8s = false;
- metInt16s = false;
- metInt32s = false;
- metInt64s = false;
- metFloats = false;
- metDoubles = false;
- metNull = false;
- }
-
- @Override
- public void step(IFrameTupleReference tuple) throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- if (inputVal.getLength() > 0) {
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
- .deserialize(inputVal.getByteArray()[0]);
- switch (typeTag) {
- case INT8: {
- metInt8s = true;
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- if (val < byteVal)
- byteVal = val;
- break;
- }
- case INT16: {
- metInt16s = true;
- short val = AInt16SerializerDeserializer.getShort(
- inputVal.getByteArray(), 1);
- if (val < shortVal)
- shortVal = val;
- break;
- }
- case INT32: {
- metInt32s = true;
- int val = AInt32SerializerDeserializer.getInt(
- inputVal.getByteArray(), 1);
- if (val < intVal)
- intVal = val;
- break;
- }
- case INT64: {
- metInt64s = true;
- long val = AInt64SerializerDeserializer.getLong(
- inputVal.getByteArray(), 1);
- if (val < longVal)
- longVal = val;
- break;
- }
- case FLOAT: {
- metFloats = true;
- float val = AFloatSerializerDeserializer.getFloat(
- inputVal.getByteArray(), 1);
- if (val < floatVal)
- floatVal = val;
- break;
- }
- case DOUBLE: {
- metDoubles = true;
- double val = ADoubleSerializerDeserializer.getDouble(
- inputVal.getByteArray(), 1);
- if (val < doubleVal)
- doubleVal = val;
- break;
- }
- case NULL: {
- metNull = true;
- break;
- }
- case SYSTEM_NULL: {
- // For global aggregates simply ignore system null here,
- // but if all input value are system null, then we should return
- // null in finish().
- if (isLocalAgg) {
- throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
- }
- break;
- }
- default: {
- throw new NotImplementedException(
- "Cannot compute SUM for values of type " + typeTag);
- }
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void finish() throws AlgebricksException {
- try {
- if (metNull) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- } else if (metDoubles) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- aDouble.setValue(doubleVal);
- serde.serialize(aDouble, out);
- } else if (metFloats) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AFLOAT);
- aFloat.setValue(floatVal);
- serde.serialize(aFloat, out);
- } else if (metInt64s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- aInt64.setValue(longVal);
- serde.serialize(aInt64, out);
- } else if (metInt32s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT32);
- aInt32.setValue(intVal);
- serde.serialize(aInt32, out);
- } else if (metInt16s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT16);
- aInt16.setValue(shortVal);
- serde.serialize(aInt16, out);
- } else if (metInt8s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT8);
- aInt8.setValue(byteVal);
- serde.serialize(aInt8, out);
- } else {
- // Empty stream. For local agg return system null. For global agg return null.
- if (isLocalAgg) {
- out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- } else {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- }
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void finishPartial() throws AlgebricksException {
- finish();
- }
-}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
new file mode 100644
index 0000000..bc3508f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
@@ -0,0 +1,106 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class MinMaxAggregateFunction implements ICopyAggregateFunction {
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
+ private DataOutput out;
+ private ICopyEvaluator eval;
+ private ATypeTag aggType;
+ private IBinaryComparator cmp;
+ private final boolean isMin;
+ private final boolean isLocalAgg;
+
+ public MinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
+ boolean isLocalAgg) throws AlgebricksException {
+ out = provider.getDataOutput();
+ eval = args[0].createEvaluator(inputVal);
+ this.isMin = isMin;
+ this.isLocalAgg = isLocalAgg;
+ }
+
+ @Override
+ public void init() {
+ aggType = ATypeTag.SYSTEM_NULL;
+ outputVal.reset();
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ inputVal.reset();
+ eval.evaluate(tuple);
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+ aggType = ATypeTag.NULL;
+ return;
+ }
+ if (aggType == ATypeTag.SYSTEM_NULL) {
+ if (typeTag == ATypeTag.SYSTEM_NULL) {
+ // Ignore.
+ return;
+ }
+ // First value encountered. Set type, comparator, and initial value.
+ aggType = typeTag;
+ // Set comparator.
+ IBinaryComparatorFactory cmpFactory = AqlBinaryComparatorFactoryProvider.INSTANCE
+ .getBinaryComparatorFactory(aggType, isMin);
+ cmp = cmpFactory.createBinaryComparator();
+ // Initialize min value.
+ outputVal.assign(inputVal);
+ } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
+ throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+ + aggType + ".");
+ }
+ if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
+ outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
+ outputVal.assign(inputVal);
+ }
+ }
+
+ @Override
+ public void finish() throws AlgebricksException {
+ try {
+ switch (aggType) {
+ case NULL: {
+ out.writeByte(ATypeTag.NULL.serialize());
+ break;
+ }
+ case SYSTEM_NULL: {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ out.writeByte(ATypeTag.NULL.serialize());
+ }
+ break;
+ }
+ default: {
+ out.write(outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength());
+ break;
+ }
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
index fd83725..d0e82ce 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
@@ -65,9 +65,6 @@
inputVal.reset();
eval.evaluate(tuple);
ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
- if (typeTag == ATypeTag.NULL) {
- aggType = ATypeTag.NULL;
- }
if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
aggType = ATypeTag.NULL;
return;