Added local min and max aggregate functions. Changed min and max aggregate functions to handle system null properly. Enabled tests.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_fix_agg@529 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/resources/runtimets/ignore.txt b/asterix-app/src/test/resources/runtimets/ignore.txt
index 82051f9..43701fa 100644
--- a/asterix-app/src/test/resources/runtimets/ignore.txt
+++ b/asterix-app/src/test/resources/runtimets/ignore.txt
@@ -38,7 +38,3 @@
quantifiers/everysat_02.aql
quantifiers/everysat_03.aql
aggregate/avg_empty_02.aql
-aggregate/min_empty_01.aql
-aggregate/min_empty_02.aql
-aggregate/max_empty_01.aql
-aggregate/max_empty_02.aql
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index 8669ef0..2af5d9d 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -236,7 +236,9 @@
public final static FunctionIdentifier SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-sum", 1);
public final static FunctionIdentifier LOCAL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sum", 1);
public final static FunctionIdentifier MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-max", 1);
+ public final static FunctionIdentifier LOCAL_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-max", 1);
public final static FunctionIdentifier MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-min", 1);
+ public final static FunctionIdentifier LOCAL_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-min", 1);
public final static FunctionIdentifier GLOBAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"agg-global-avg", 1);
public final static FunctionIdentifier LOCAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -517,7 +519,9 @@
add(MAKE_FIELD_INDEX_HANDLE, null); // TODO
add(MAKE_FIELD_NAME_HANDLE, null); // TODO
add(MAX, NonTaggedSumTypeComputer.INSTANCE);
+ add(LOCAL_MAX, NonTaggedSumTypeComputer.INSTANCE);
add(MIN, NonTaggedSumTypeComputer.INSTANCE);
+ add(LOCAL_MIN, NonTaggedSumTypeComputer.INSTANCE);
add(NON_EMPTY_STREAM, ABooleanTypeComputer.INSTANCE);
add(NULL_CONSTRUCTOR, ANullTypeComputer.INSTANCE);
add(NUMERIC_UNARY_MINUS, NonTaggedUnaryMinusTypeComputer.INSTANCE);
@@ -676,14 +680,16 @@
addGlobalAgg(COUNT, SUM);
addAgg(MAX);
- addLocalAgg(MAX, MAX);
+ addAgg(LOCAL_MAX);
+ addLocalAgg(MAX, LOCAL_MAX);
addGlobalAgg(MAX, MAX);
addAgg(MIN);
- addLocalAgg(MIN, MIN);
+ addLocalAgg(MIN, LOCAL_MIN);
addGlobalAgg(MIN, MIN);
addAgg(SUM);
+ addAgg(LOCAL_SUM);
addLocalAgg(SUM, LOCAL_SUM);
addGlobalAgg(SUM, SUM);
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
index 9831c28..ef5f6d1 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -57,7 +57,6 @@
state.writeBoolean(false);
state.writeBoolean(false);
state.writeBoolean(false);
- state.writeBoolean(false);
state.writeDouble(0.0);
} catch (IOException e) {
throw new AlgebricksException(e);
@@ -75,7 +74,6 @@
boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metSystemNull = BufferSerDeUtil.getBoolean(state, pos++);
double sum = BufferSerDeUtil.getDouble(state, pos);
inputVal.reset();
@@ -125,7 +123,6 @@
break;
}
case SYSTEM_NULL: {
- metSystemNull = true;
// For global aggregates simply ignore system null here,
// but if all input value are system null, then we should return
// null in finish().
@@ -149,7 +146,6 @@
BufferSerDeUtil.writeBoolean(metFloats, state, pos++);
BufferSerDeUtil.writeBoolean(metDoubles, state, pos++);
BufferSerDeUtil.writeBoolean(metNull, state, pos++);
- BufferSerDeUtil.writeBoolean(metSystemNull, state, pos++);
BufferSerDeUtil.writeDouble(sum, state, pos);
}
@@ -164,7 +160,6 @@
boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metSystemNull = BufferSerDeUtil.getBoolean(state, pos++);
double sum = BufferSerDeUtil.getDouble(state, pos);
try {
if (metNull) {
@@ -201,10 +196,6 @@
.getSerializerDeserializer(BuiltinType.AINT8);
aInt8.setValue((byte) sum);
serde.serialize(aInt8, out);
- } else if (metSystemNull) {
- // All input values must have been of type system null.
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
} else {
// Empty stream. For local agg return system null. For global agg return null.
if (isLocalAgg) {
@@ -214,7 +205,6 @@
serde.serialize(ANull.NULL, out);
}
}
-
} catch (IOException e) {
throw new AlgebricksException(e);
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java
new file mode 100644
index 0000000..11d83b6
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+
+public class LocalMaxAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-max",
+ 1);
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new LocalMaxAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new MaxAggregateFunction(args, provider, true);
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
new file mode 100644
index 0000000..2fc219f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+
+public class LocalMinAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-min",
+ 1);
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new LocalMinAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new MinAggregateFunction(args, provider, true);
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
index e409910..a672b19 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
@@ -1,39 +1,15 @@
package edu.uci.ics.asterix.runtime.aggregates.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class MaxAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
@@ -50,7 +26,6 @@
return FID;
}
- @SuppressWarnings("unchecked")
@Override
public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
throws AlgebricksException {
@@ -60,152 +35,7 @@
@Override
public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
throws AlgebricksException {
-
- return new ICopyAggregateFunction() {
-
- private DataOutput out = provider.getDataOutput();
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
- private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
-
- private short shortVal = Short.MIN_VALUE;
- private int intVal = Integer.MIN_VALUE;
- private long longVal = Long.MIN_VALUE;
- private float floatVal = Float.MIN_VALUE;
- private double doubleVal = Double.MIN_VALUE;
-
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableFloat aFloat = new AMutableFloat(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
- private AMutableInt32 aInt32 = new AMutableInt32(0);
- private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
- @SuppressWarnings("rawtypes")
- private ISerializerDeserializer serde;
-
- @Override
- public void init() {
- shortVal = Short.MIN_VALUE;
- intVal = Integer.MIN_VALUE;
- longVal = Long.MIN_VALUE;
- floatVal = Float.MIN_VALUE;
- doubleVal = Double.MIN_VALUE;
-
- metInt8s = false;
- metInt16s = false;
- metInt32s = false;
- metInt64s = false;
- metFloats = false;
- metDoubles = false;
- metNull = false;
- }
-
- @Override
- public void step(IFrameTupleReference tuple) throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- if (inputVal.getLength() > 0) {
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
- .getByteArray()[0]);
- switch (typeTag) {
- case INT8: {
- metInt8s = true;
- throw new NotImplementedException("no implementation for int8's comparator");
- }
- case INT16: {
- metInt16s = true;
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- if (val > shortVal)
- shortVal = val;
- throw new NotImplementedException("no implementation for int16's comparator");
- }
- case INT32: {
- metInt32s = true;
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- if (val > intVal)
- intVal = val;
- break;
- }
- case INT64: {
- metInt64s = true;
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- if (val > longVal)
- longVal = val;
- break;
- }
- case FLOAT: {
- metFloats = true;
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- if (val > floatVal)
- floatVal = val;
- break;
- }
- case DOUBLE: {
- metDoubles = true;
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- if (val > doubleVal)
- doubleVal = val;
- break;
- }
- case NULL: {
- metNull = true;
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute SUM for values of type "
- + typeTag);
- }
- }
- }
- }
-
- @Override
- public void finish() throws AlgebricksException {
- try {
- if (metNull) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- } else if (metDoubles) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- aDouble.setValue(doubleVal);
- serde.serialize(aDouble, out);
- } else if (metFloats) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AFLOAT);
- aFloat.setValue(floatVal);
- serde.serialize(aFloat, out);
- } else if (metInt64s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- aInt64.setValue(longVal);
- serde.serialize(aInt64, out);
- } else if (metInt32s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT32);
- aInt32.setValue(intVal);
- serde.serialize(aInt32, out);
- } else if (metInt16s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT16);
- aInt16.setValue(shortVal);
- serde.serialize(aInt16, out);
- } else if (metInt8s) {
- throw new NotImplementedException("no implementation for int8's comparator");
- } else {
- GlobalConfig.ASTERIX_LOGGER.fine("SUM aggregate ran over empty input.");
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
-
- }
-
- @Override
- public void finishPartial() throws AlgebricksException {
- finish();
- }
- };
+ return new MaxAggregateFunction(args, provider, false);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateFunction.java
new file mode 100644
index 0000000..3f86c80
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateFunction.java
@@ -0,0 +1,198 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class MaxAggregateFunction implements ICopyAggregateFunction {
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private DataOutput out;
+ private ICopyEvaluator eval;
+ private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
+
+ private short shortVal = Short.MIN_VALUE;
+ private int intVal = Integer.MIN_VALUE;
+ private long longVal = Long.MIN_VALUE;
+ private float floatVal = Float.MIN_VALUE;
+ private double doubleVal = Double.MIN_VALUE;
+
+ private AMutableDouble aDouble = new AMutableDouble(0);
+ private AMutableFloat aFloat = new AMutableFloat(0);
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private AMutableInt32 aInt32 = new AMutableInt32(0);
+ private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+ @SuppressWarnings("rawtypes")
+ private ISerializerDeserializer serde;
+ private final boolean isLocalAgg;
+
+ public MaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
+ throws AlgebricksException {
+ out = provider.getDataOutput();
+ eval = args[0].createEvaluator(inputVal);
+ this.isLocalAgg = isLocalAgg;
+ }
+
+ @Override
+ public void init() {
+ shortVal = Short.MIN_VALUE;
+ intVal = Integer.MIN_VALUE;
+ longVal = Long.MIN_VALUE;
+ floatVal = Float.MIN_VALUE;
+ doubleVal = Double.MIN_VALUE;
+
+ metInt8s = false;
+ metInt16s = false;
+ metInt32s = false;
+ metInt64s = false;
+ metFloats = false;
+ metDoubles = false;
+ metNull = false;
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ inputVal.reset();
+ eval.evaluate(tuple);
+ if (inputVal.getLength() > 0) {
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
+ .getByteArray()[0]);
+ switch (typeTag) {
+ case INT8: {
+ metInt8s = true;
+ throw new NotImplementedException("no implementation for int8's comparator");
+ }
+ case INT16: {
+ metInt16s = true;
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ if (val > shortVal)
+ shortVal = val;
+ throw new NotImplementedException("no implementation for int16's comparator");
+ }
+ case INT32: {
+ metInt32s = true;
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ if (val > intVal)
+ intVal = val;
+ break;
+ }
+ case INT64: {
+ metInt64s = true;
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ if (val > longVal)
+ longVal = val;
+ break;
+ }
+ case FLOAT: {
+ metFloats = true;
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ if (val > floatVal)
+ floatVal = val;
+ break;
+ }
+ case DOUBLE: {
+ metDoubles = true;
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ if (val > doubleVal)
+ doubleVal = val;
+ break;
+ }
+ case NULL: {
+ metNull = true;
+ break;
+ }
+ case SYSTEM_NULL: {
+ // For global aggregates simply ignore system null here,
+ // but if all input value are system null, then we should return
+ // null in finish().
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute SUM for values of type "
+ + typeTag);
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void finish() throws AlgebricksException {
+ try {
+ if (metNull) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ } else if (metDoubles) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ADOUBLE);
+ aDouble.setValue(doubleVal);
+ serde.serialize(aDouble, out);
+ } else if (metFloats) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AFLOAT);
+ aFloat.setValue(floatVal);
+ serde.serialize(aFloat, out);
+ } else if (metInt64s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+ aInt64.setValue(longVal);
+ serde.serialize(aInt64, out);
+ } else if (metInt32s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT32);
+ aInt32.setValue(intVal);
+ serde.serialize(aInt32, out);
+ } else if (metInt16s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT16);
+ aInt16.setValue(shortVal);
+ serde.serialize(aInt16, out);
+ } else if (metInt8s) {
+ throw new NotImplementedException("no implementation for int8's comparator");
+ } else {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ }
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
index af54c14..0d2b68b 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
@@ -1,39 +1,15 @@
package edu.uci.ics.asterix.runtime.aggregates.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class MinAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
@@ -50,7 +26,6 @@
return FID;
}
- @SuppressWarnings("unchecked")
@Override
public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
throws AlgebricksException {
@@ -60,154 +35,8 @@
@Override
public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
throws AlgebricksException {
-
- return new ICopyAggregateFunction() {
-
- private DataOutput out = provider.getDataOutput();
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
- private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
-
- private short shortVal = Short.MAX_VALUE;
- private int intVal = Integer.MAX_VALUE;
- private long longVal = Long.MAX_VALUE;
- private float floatVal = Float.MAX_VALUE;
- private double doubleVal = Double.MAX_VALUE;
-
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableFloat aFloat = new AMutableFloat(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
- private AMutableInt32 aInt32 = new AMutableInt32(0);
- private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
- @SuppressWarnings("rawtypes")
- private ISerializerDeserializer serde;
-
- @Override
- public void init() {
- shortVal = Short.MAX_VALUE;
- intVal = Integer.MAX_VALUE;
- longVal = Long.MAX_VALUE;
- floatVal = Float.MAX_VALUE;
- doubleVal = Double.MAX_VALUE;
-
- metInt8s = false;
- metInt16s = false;
- metInt32s = false;
- metInt64s = false;
- metFloats = false;
- metDoubles = false;
- metNull = false;
- }
-
- @Override
- public void step(IFrameTupleReference tuple) throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- if (inputVal.getLength() > 0) {
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
- .getByteArray()[0]);
- switch (typeTag) {
- case INT8: {
- metInt8s = true;
- throw new NotImplementedException("no implementation for int8's comparator");
- }
- case INT16: {
- metInt16s = true;
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- if (val < shortVal)
- shortVal = val;
- throw new NotImplementedException("no implementation for int16's comparator");
- }
- case INT32: {
- metInt32s = true;
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- if (val < intVal)
- intVal = val;
- break;
- }
- case INT64: {
- metInt64s = true;
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- if (val < longVal)
- longVal = val;
- break;
- }
- case FLOAT: {
- metFloats = true;
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- if (val < floatVal)
- floatVal = val;
- break;
- }
- case DOUBLE: {
- metDoubles = true;
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- if (val < doubleVal)
- doubleVal = val;
- break;
- }
- case NULL: {
- metNull = true;
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute SUM for values of type "
- + typeTag);
- }
- }
- }
- }
-
- @Override
- public void finish() throws AlgebricksException {
- try {
- if (metNull) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- } else if (metDoubles) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- aDouble.setValue(doubleVal);
- serde.serialize(aDouble, out);
- } else if (metFloats) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AFLOAT);
- aFloat.setValue(floatVal);
- serde.serialize(aFloat, out);
- } else if (metInt64s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- aInt64.setValue(longVal);
- serde.serialize(aInt64, out);
- } else if (metInt32s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT32);
- aInt32.setValue(intVal);
- serde.serialize(aInt32, out);
- } else if (metInt16s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT16);
- aInt16.setValue(shortVal);
- serde.serialize(aInt16, out);
- } else if (metInt8s) {
- throw new NotImplementedException("no implementation for int8's comparator");
- } else {
- GlobalConfig.ASTERIX_LOGGER.fine("SUM aggregate ran over empty input.");
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
-
- }
-
- @Override
- public void finishPartial() throws AlgebricksException {
- finish();
- }
- };
+ return new MinAggregateFunction(args, provider, false);
}
};
}
-
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateFunction.java
new file mode 100644
index 0000000..0fd2355
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateFunction.java
@@ -0,0 +1,205 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class MinAggregateFunction implements ICopyAggregateFunction {
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private DataOutput out;
+ private ICopyEvaluator eval;
+ private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats,
+ metDoubles, metNull;
+ private short shortVal = Short.MAX_VALUE;
+ private int intVal = Integer.MAX_VALUE;
+ private long longVal = Long.MAX_VALUE;
+ private float floatVal = Float.MAX_VALUE;
+ private double doubleVal = Double.MAX_VALUE;
+
+ private AMutableDouble aDouble = new AMutableDouble(0);
+ private AMutableFloat aFloat = new AMutableFloat(0);
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private AMutableInt32 aInt32 = new AMutableInt32(0);
+ private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+ @SuppressWarnings("rawtypes")
+ private ISerializerDeserializer serde;
+ private final boolean isLocalAgg;
+
+ public MinAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
+ throws AlgebricksException {
+ out = provider.getDataOutput();
+ eval = args[0].createEvaluator(inputVal);
+ this.isLocalAgg = isLocalAgg;
+ }
+
+ @Override
+ public void init() {
+ shortVal = Short.MAX_VALUE;
+ intVal = Integer.MAX_VALUE;
+ longVal = Long.MAX_VALUE;
+ floatVal = Float.MAX_VALUE;
+ doubleVal = Double.MAX_VALUE;
+
+ metInt8s = false;
+ metInt16s = false;
+ metInt32s = false;
+ metInt64s = false;
+ metFloats = false;
+ metDoubles = false;
+ metNull = false;
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ inputVal.reset();
+ eval.evaluate(tuple);
+ if (inputVal.getLength() > 0) {
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(inputVal.getByteArray()[0]);
+ switch (typeTag) {
+ case INT8: {
+ metInt8s = true;
+ throw new NotImplementedException(
+ "no implementation for int8's comparator");
+ }
+ case INT16: {
+ metInt16s = true;
+ short val = AInt16SerializerDeserializer.getShort(
+ inputVal.getByteArray(), 1);
+ if (val < shortVal)
+ shortVal = val;
+ throw new NotImplementedException(
+ "no implementation for int16's comparator");
+ }
+ case INT32: {
+ metInt32s = true;
+ int val = AInt32SerializerDeserializer.getInt(
+ inputVal.getByteArray(), 1);
+ if (val < intVal)
+ intVal = val;
+ break;
+ }
+ case INT64: {
+ metInt64s = true;
+ long val = AInt64SerializerDeserializer.getLong(
+ inputVal.getByteArray(), 1);
+ if (val < longVal)
+ longVal = val;
+ break;
+ }
+ case FLOAT: {
+ metFloats = true;
+ float val = AFloatSerializerDeserializer.getFloat(
+ inputVal.getByteArray(), 1);
+ if (val < floatVal)
+ floatVal = val;
+ break;
+ }
+ case DOUBLE: {
+ metDoubles = true;
+ double val = ADoubleSerializerDeserializer.getDouble(
+ inputVal.getByteArray(), 1);
+ if (val < doubleVal)
+ doubleVal = val;
+ break;
+ }
+ case NULL: {
+ metNull = true;
+ break;
+ }
+ case SYSTEM_NULL: {
+ // For global aggregates simply ignore system null here,
+ // but if all input value are system null, then we should return
+ // null in finish().
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ break;
+ }
+ default: {
+ throw new NotImplementedException(
+ "Cannot compute SUM for values of type " + typeTag);
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void finish() throws AlgebricksException {
+ try {
+ if (metNull) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ } else if (metDoubles) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ADOUBLE);
+ aDouble.setValue(doubleVal);
+ serde.serialize(aDouble, out);
+ } else if (metFloats) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AFLOAT);
+ aFloat.setValue(floatVal);
+ serde.serialize(aFloat, out);
+ } else if (metInt64s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+ aInt64.setValue(longVal);
+ serde.serialize(aInt64, out);
+ } else if (metInt32s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT32);
+ aInt32.setValue(intVal);
+ serde.serialize(aInt32, out);
+ } else if (metInt16s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT16);
+ aInt16.setValue(shortVal);
+ serde.serialize(aInt16, out);
+ } else if (metInt8s) {
+ throw new NotImplementedException(
+ "no implementation for int8's comparator");
+ } else {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ }
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
index 282d7b3..de33431 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
@@ -34,7 +34,7 @@
private DataOutput out;
private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
private ICopyEvaluator eval;
- private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull, metSystemNull;
+ private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
private double sum;
private AMutableDouble aDouble = new AMutableDouble(0);
private AMutableFloat aFloat = new AMutableFloat(0);
@@ -63,7 +63,6 @@
metFloats = false;
metDoubles = false;
metNull = false;
- metSystemNull = false;
sum = 0.0;
}
@@ -115,7 +114,6 @@
break;
}
case SYSTEM_NULL: {
- metSystemNull = true;
// For global aggregates simply ignore system null here,
// but if all input value are system null, then we should return
// null in finish().
@@ -162,10 +160,6 @@
serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
aInt8.setValue((byte) sum);
serde.serialize(aInt8, out);
- } else if (metSystemNull) {
- // All input values must have been of type system null.
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
} else {
// Empty stream. For local agg return system null. For global agg return null.
if (isLocalAgg) {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index e17a4dd..001cf1b 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -51,6 +51,8 @@
import edu.uci.ics.asterix.runtime.aggregates.std.CountAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.GlobalAvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.LocalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalMaxAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalMinAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.LocalSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.MaxAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.MinAggregateDescriptor;
@@ -82,6 +84,7 @@
import edu.uci.ics.asterix.runtime.evaluators.functions.AnyCollectionMemberDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.CastRecordDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.ClosedRecordConstructorDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.CodePointToStringDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.ContainsDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.CountHashedGramTokensDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.CountHashedWordTokensDescriptor;
@@ -109,10 +112,16 @@
import edu.uci.ics.asterix.runtime.evaluators.functions.LenDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.LikeDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NotDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericAbsDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NumericAddDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericCeilingDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NumericDivideDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericFloorDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NumericModuloDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NumericMultiplyDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundHalfToEven2Descriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundHalfToEvenDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NumericSubtractDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NumericUnaryMinusDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.OpenRecordConstructorDescriptor;
@@ -131,33 +140,26 @@
import edu.uci.ics.asterix.runtime.evaluators.functions.SpatialDistanceDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.SpatialIntersectDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.StartsWithDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringConcatDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringEndWithDescrtiptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringEqualDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringJoinDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringLengthDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringLowerCaseDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringMatchesDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringMatchesWithFlagDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringReplaceDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringReplaceWithFlagsDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringStartWithDescrtiptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringToCodePointDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.Substring2Descriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringAfterDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringBeforeDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.SwitchCaseDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.UnorderedListConstructorDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.WordTokensDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.YearDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericAbsDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericCeilingDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericFloorDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundHalfToEvenDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundHalfToEven2Descriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringEqualDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringStartWithDescrtiptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringEndWithDescrtiptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringMatchesDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringLowerCaseDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringMatchesWithFlagDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringReplaceDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringReplaceWithFlagsDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringLengthDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.Substring2Descriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringBeforeDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringAfterDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringToCodePointDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.CodePointToStringDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringConcatDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringJoinDescriptor;
import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
import edu.uci.ics.asterix.runtime.runningaggregates.std.TidRunningAggregateDescriptor;
@@ -304,7 +306,9 @@
temp.add(SumAggregateDescriptor.FACTORY);
temp.add(LocalSumAggregateDescriptor.FACTORY);
temp.add(MaxAggregateDescriptor.FACTORY);
+ temp.add(LocalMaxAggregateDescriptor.FACTORY);
temp.add(MinAggregateDescriptor.FACTORY);
+ temp.add(LocalMinAggregateDescriptor.FACTORY);
// serializable aggregates
temp.add(SerializableCountAggregateDescriptor.FACTORY);