Added system null to distinguish between empty stream and null in aggregate functions. Changes sum aggregate function to use system null properly. Other aggregates to follow.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_fix_agg@523 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index 1e7ea33..a01af41 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -63,7 +63,8 @@
import edu.uci.ics.hyracks.algebricks.rewriter.rules.InsertProjectBeforeUnionRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroHashPartitionMergeExchange;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroJoinInsideSubplanRule;
-import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceCombinerRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceAggregateCombinerRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByCombinerRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByForSubplanRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.IsolateHyracksOperatorsRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.PullSelectOutOfEqJoin;
@@ -93,9 +94,6 @@
public final static List<IAlgebraicRewriteRule> buildNormalizationRuleCollection() {
List<IAlgebraicRewriteRule> normalization = new LinkedList<IAlgebraicRewriteRule>();
normalization.add(new EliminateSubplanRule());
- // TODO: The IntroduceGroupByForStandaloneAggregRule performs an invalid rewrite, and we've replaced it
- // with PushAggFuncIntoStandaloneAggregateRule. Wee need more testing before we can remove it complete.y
- //normalization.add(new IntroduceGroupByForStandaloneAggregRule());
normalization.add(new PushAggFuncIntoStandaloneAggregateRule());
normalization.add(new BreakSelectIntoConjunctsRule());
normalization.add(new ExtractGbyExpressionsRule());
@@ -169,7 +167,8 @@
consolidation.add(new ConsolidateSelectsRule());
consolidation.add(new ConsolidateAssignsRule());
consolidation.add(new InlineAssignIntoAggregateRule());
- consolidation.add(new IntroduceCombinerRule());
+ consolidation.add(new IntroduceGroupByCombinerRule());
+ consolidation.add(new IntroduceAggregateCombinerRule());
consolidation.add(new CountVarToCountOneRule());
consolidation.add(new IntroduceSelectAccessMethodRule());
consolidation.add(new IntroduceJoinAccessMethodRule());
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index 9ebf8d6..f0b62c2 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -177,6 +177,7 @@
public final static FunctionIdentifier AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-avg", 1);
public final static FunctionIdentifier COUNT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-count", 1);
public final static FunctionIdentifier SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-sum", 1);
+ public final static FunctionIdentifier LOCAL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sum", 1);
public final static FunctionIdentifier MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-max", 1);
public final static FunctionIdentifier MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-min", 1);
public final static FunctionIdentifier GLOBAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -202,6 +203,8 @@
"count-serial", 1);
public final static FunctionIdentifier SERIAL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"sum-serial", 1);
+ public final static FunctionIdentifier SERIAL_LOCAL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "local-sum-serial", 1);
public final static FunctionIdentifier SERIAL_GLOBAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"global-avg-serial", 1);
public final static FunctionIdentifier SERIAL_LOCAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -488,6 +491,7 @@
add(SERIAL_GLOBAL_AVG, OptionalADoubleTypeComputer.INSTANCE);
add(SERIAL_LOCAL_AVG, NonTaggedLocalAvgTypeComputer.INSTANCE);
add(SERIAL_SUM, NonTaggedSumTypeComputer.INSTANCE);
+ add(SERIAL_LOCAL_SUM, NonTaggedSumTypeComputer.INSTANCE);
add(SIMILARITY_JACCARD, AFloatTypeComputer.INSTANCE);
add(SIMILARITY_JACCARD_CHECK, OrderedListOfAnyTypeComputer.INSTANCE);
add(SIMILARITY_JACCARD_SORTED, AFloatTypeComputer.INSTANCE);
@@ -539,6 +543,7 @@
});
add(SUBSTRING, AStringTypeComputer.INSTANCE);
add(SUM, NonTaggedSumTypeComputer.INSTANCE);
+ add(LOCAL_SUM, NonTaggedSumTypeComputer.INSTANCE);
add(SWITCH_CASE, NonTaggedSwitchCaseComputer.INSTANCE);
add(REG_EXP, ABooleanTypeComputer.INSTANCE);
add(INJECT_FAILURE, InjectFailureTypeComputer.INSTANCE);
@@ -596,7 +601,7 @@
addGlobalAgg(MIN, MIN);
addAgg(SUM);
- addLocalAgg(SUM, SUM);
+ addLocalAgg(SUM, LOCAL_SUM);
addGlobalAgg(SUM, SUM);
addAgg(LISTIFY);
@@ -605,6 +610,7 @@
addSerialAgg(AVG, SERIAL_AVG);
addSerialAgg(COUNT, SERIAL_COUNT);
addSerialAgg(SUM, SERIAL_SUM);
+ addSerialAgg(LOCAL_SUM, SERIAL_LOCAL_SUM);
addSerialAgg(LOCAL_AVG, SERIAL_LOCAL_AVG);
addSerialAgg(GLOBAL_AVG, SERIAL_GLOBAL_AVG);
@@ -619,7 +625,8 @@
addGlobalAgg(SERIAL_AVG, SERIAL_GLOBAL_AVG);
addAgg(SERIAL_SUM);
- addLocalAgg(SERIAL_SUM, SERIAL_SUM);
+ addAgg(SERIAL_LOCAL_SUM);
+ addLocalAgg(SERIAL_SUM, SERIAL_LOCAL_SUM);
addGlobalAgg(SERIAL_SUM, SERIAL_SUM);
}
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
index aac5447..b75c074 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
@@ -23,7 +23,7 @@
FLOAT(11),
DOUBLE(12),
STRING(13),
- NULL(14),
+ NULL(14),
BOOLEAN(15),
DATETIME(16),
DATE(17),
@@ -42,7 +42,8 @@
LINE(30),
POLYGON(31),
CIRCLE(32),
- RECTANGLE(33);
+ RECTANGLE(33),
+ SYSTEM_NULL(34);
private byte value;
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
new file mode 100644
index 0000000..d080b27
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableLocalSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "local-sum-serial", 1);
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new SerializableLocalSumAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID;
+ }
+
+ @Override
+ public ICopySerializableAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopySerializableAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+ return new SerializableSumAggregateFunction(args, true);
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
index e689ab4..b97e35b 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
@@ -1,40 +1,14 @@
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
@@ -55,182 +29,12 @@
public ICopySerializableAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
throws AlgebricksException {
return new ICopySerializableAggregateFunctionFactory() {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
@Override
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
- return new ICopySerializableAggregateFunction() {
-
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableFloat aFloat = new AMutableFloat(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
- private AMutableInt32 aInt32 = new AMutableInt32(0);
- private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
- private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
- @SuppressWarnings("rawtypes")
- private ISerializerDeserializer serde;
-
- @Override
- public void init(DataOutput state) throws AlgebricksException {
- try {
- state.writeBoolean(false);
- state.writeBoolean(false);
- state.writeBoolean(false);
- state.writeBoolean(false);
- state.writeBoolean(false);
- state.writeBoolean(false);
- state.writeBoolean(false);
- state.writeDouble(0.0);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
- throws AlgebricksException {
- int pos = start;
- boolean metInt8s = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metInt16s = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metInt32s = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metInt64s = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
- double sum = BufferSerDeUtil.getDouble(state, pos);
-
- inputVal.reset();
- eval.evaluate(tuple);
- if (inputVal.getLength() > 0) {
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
- .getByteArray()[0]);
- switch (typeTag) {
- case INT8: {
- metInt8s = true;
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- metInt16s = true;
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- metInt32s = true;
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- metInt64s = true;
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- metFloats = true;
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- metDoubles = true;
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- metNull = true;
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute SUM for values of type "
- + typeTag);
- }
- }
- }
-
- pos = start;
- BufferSerDeUtil.writeBoolean(metInt8s, state, pos++);
- BufferSerDeUtil.writeBoolean(metInt16s, state, pos++);
- BufferSerDeUtil.writeBoolean(metInt32s, state, pos++);
- BufferSerDeUtil.writeBoolean(metInt64s, state, pos++);
- BufferSerDeUtil.writeBoolean(metFloats, state, pos++);
- BufferSerDeUtil.writeBoolean(metDoubles, state, pos++);
- BufferSerDeUtil.writeBoolean(metNull, state, pos++);
- BufferSerDeUtil.writeDouble(sum, state, pos);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
- int pos = start;
- boolean metInt8s = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metInt16s = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metInt32s = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metInt64s = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
- double sum = BufferSerDeUtil.getDouble(state, pos);
- try {
- if (metNull) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- } else if (metDoubles) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- aDouble.setValue(sum);
- serde.serialize(aDouble, out);
- } else if (metFloats) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AFLOAT);
- aFloat.setValue((float) sum);
- serde.serialize(aFloat, out);
- } else if (metInt64s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- aInt64.setValue((long) sum);
- serde.serialize(aInt64, out);
- } else if (metInt32s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT32);
- aInt32.setValue((int) sum);
- serde.serialize(aInt32, out);
- } else if (metInt16s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT16);
- aInt16.setValue((short) sum);
- serde.serialize(aInt16, out);
- } else if (metInt8s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT8);
- aInt8.setValue((byte) sum);
- serde.serialize(aInt8, out);
- } else {
- GlobalConfig.ASTERIX_LOGGER.fine("SUM aggregate ran over empty input.");
- }
-
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
-
- }
-
- @Override
- public void finishPartial(byte[] state, int start, int len, DataOutput out)
- throws AlgebricksException {
- finish(state, start, len, out);
- }
- };
+ return new SerializableSumAggregateFunction(args, false);
}
};
}
-
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
new file mode 100644
index 0000000..9831c28
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -0,0 +1,229 @@
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableSumAggregateFunction implements ICopySerializableAggregateFunction {
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private ICopyEvaluator eval;
+ private AMutableDouble aDouble = new AMutableDouble(0);
+ private AMutableFloat aFloat = new AMutableFloat(0);
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private AMutableInt32 aInt32 = new AMutableInt32(0);
+ private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+ private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+ @SuppressWarnings("rawtypes")
+ private ISerializerDeserializer serde;
+ private final boolean isLocalAgg = false;
+
+ public SerializableSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
+ throws AlgebricksException {
+ eval = args[0].createEvaluator(inputVal);
+ }
+
+ @Override
+ public void init(DataOutput state) throws AlgebricksException {
+ try {
+ state.writeBoolean(false);
+ state.writeBoolean(false);
+ state.writeBoolean(false);
+ state.writeBoolean(false);
+ state.writeBoolean(false);
+ state.writeBoolean(false);
+ state.writeBoolean(false);
+ state.writeBoolean(false);
+ state.writeDouble(0.0);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
+ throws AlgebricksException {
+ int pos = start;
+ boolean metInt8s = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metInt16s = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metInt32s = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metInt64s = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metSystemNull = BufferSerDeUtil.getBoolean(state, pos++);
+ double sum = BufferSerDeUtil.getDouble(state, pos);
+
+ inputVal.reset();
+ eval.evaluate(tuple);
+ if (inputVal.getLength() > 0) {
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
+ .getByteArray()[0]);
+ switch (typeTag) {
+ case INT8: {
+ metInt8s = true;
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT16: {
+ metInt16s = true;
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ metInt32s = true;
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ metInt64s = true;
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ metFloats = true;
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ metDoubles = true;
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case NULL: {
+ metNull = true;
+ break;
+ }
+ case SYSTEM_NULL: {
+ metSystemNull = true;
+ // For global aggregates simply ignore system null here,
+ // but if all input value are system null, then we should return
+ // null in finish().
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute SUM for values of type "
+ + typeTag);
+ }
+ }
+ }
+
+ pos = start;
+ BufferSerDeUtil.writeBoolean(metInt8s, state, pos++);
+ BufferSerDeUtil.writeBoolean(metInt16s, state, pos++);
+ BufferSerDeUtil.writeBoolean(metInt32s, state, pos++);
+ BufferSerDeUtil.writeBoolean(metInt64s, state, pos++);
+ BufferSerDeUtil.writeBoolean(metFloats, state, pos++);
+ BufferSerDeUtil.writeBoolean(metDoubles, state, pos++);
+ BufferSerDeUtil.writeBoolean(metNull, state, pos++);
+ BufferSerDeUtil.writeBoolean(metSystemNull, state, pos++);
+ BufferSerDeUtil.writeDouble(sum, state, pos);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+ int pos = start;
+ boolean metInt8s = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metInt16s = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metInt32s = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metInt64s = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metSystemNull = BufferSerDeUtil.getBoolean(state, pos++);
+ double sum = BufferSerDeUtil.getDouble(state, pos);
+ try {
+ if (metNull) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ } else if (metDoubles) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ADOUBLE);
+ aDouble.setValue(sum);
+ serde.serialize(aDouble, out);
+ } else if (metFloats) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AFLOAT);
+ aFloat.setValue((float) sum);
+ serde.serialize(aFloat, out);
+ } else if (metInt64s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+ aInt64.setValue((long) sum);
+ serde.serialize(aInt64, out);
+ } else if (metInt32s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT32);
+ aInt32.setValue((int) sum);
+ serde.serialize(aInt32, out);
+ } else if (metInt16s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT16);
+ aInt16.setValue((short) sum);
+ serde.serialize(aInt16, out);
+ } else if (metInt8s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT8);
+ aInt8.setValue((byte) sum);
+ serde.serialize(aInt8, out);
+ } else if (metSystemNull) {
+ // All input values must have been of type system null.
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ } else {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ }
+ }
+
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput out)
+ throws AlgebricksException {
+ finish(state, start, len, out);
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
new file mode 100644
index 0000000..b2f3e57
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+
+public class LocalSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sum",
+ 1);
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new LocalSumAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new SumAggregateFunction(args, provider, true);
+ };
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateDescriptor.java
index 468def7..ad2880a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateDescriptor.java
@@ -1,41 +1,15 @@
package edu.uci.ics.asterix.runtime.aggregates.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
@@ -52,7 +26,6 @@
return FID;
}
- @SuppressWarnings("unchecked")
@Override
public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
throws AlgebricksException {
@@ -62,148 +35,8 @@
@Override
public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
throws AlgebricksException {
-
- return new ICopyAggregateFunction() {
-
- private DataOutput out = provider.getDataOutput();
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
- private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
- private double sum;
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableFloat aFloat = new AMutableFloat(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
- private AMutableInt32 aInt32 = new AMutableInt32(0);
- private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
- private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
- @SuppressWarnings("rawtypes")
- private ISerializerDeserializer serde;
-
- @Override
- public void init() {
- metInt8s = false;
- metInt16s = false;
- metInt32s = false;
- metInt64s = false;
- metFloats = false;
- metDoubles = false;
- metNull = false;
- sum = 0.0;
- }
-
- @Override
- public void step(IFrameTupleReference tuple) throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- if (inputVal.getLength() > 0) {
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
- .getByteArray()[0]);
- switch (typeTag) {
- case INT8: {
- metInt8s = true;
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- metInt16s = true;
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- metInt32s = true;
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- metInt64s = true;
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- metFloats = true;
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- metDoubles = true;
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- metNull = true;
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute SUM for values of type "
- + typeTag);
- }
- }
- }
- }
-
- @Override
- public void finish() throws AlgebricksException {
- try {
- if (metNull) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- } else if (metDoubles) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- aDouble.setValue(sum);
- serde.serialize(aDouble, out);
- } else if (metFloats) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AFLOAT);
- aFloat.setValue((float) sum);
- serde.serialize(aFloat, out);
- } else if (metInt64s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- aInt64.setValue((long) sum);
- serde.serialize(aInt64, out);
- } else if (metInt32s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT32);
- aInt32.setValue((int) sum);
- serde.serialize(aInt32, out);
- } else if (metInt16s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT16);
- aInt16.setValue((short) sum);
- serde.serialize(aInt16, out);
- } else if (metInt8s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT8);
- aInt8.setValue((byte) sum);
- serde.serialize(aInt8, out);
- } else {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- GlobalConfig.ASTERIX_LOGGER.fine("SUM aggregate ran over empty input.");
- }
-
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
-
- }
-
- @Override
- public void finishPartial() throws AlgebricksException {
- finish();
- }
- };
- }
+ return new SumAggregateFunction(args, provider, false);
+ };
};
}
-
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
new file mode 100644
index 0000000..282d7b3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
@@ -0,0 +1,187 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SumAggregateFunction implements ICopyAggregateFunction {
+ private DataOutput out;
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private ICopyEvaluator eval;
+ private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull, metSystemNull;
+ private double sum;
+ private AMutableDouble aDouble = new AMutableDouble(0);
+ private AMutableFloat aFloat = new AMutableFloat(0);
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private AMutableInt32 aInt32 = new AMutableInt32(0);
+ private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+ private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+ @SuppressWarnings("rawtypes")
+ private ISerializerDeserializer serde;
+
+ private final boolean isLocalAgg;
+
+ public SumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
+ throws AlgebricksException {
+ out = provider.getDataOutput();
+ eval = args[0].createEvaluator(inputVal);
+ this.isLocalAgg = isLocalAgg;
+ }
+
+ @Override
+ public void init() {
+ metInt8s = false;
+ metInt16s = false;
+ metInt32s = false;
+ metInt64s = false;
+ metFloats = false;
+ metDoubles = false;
+ metNull = false;
+ metSystemNull = false;
+ sum = 0.0;
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ inputVal.reset();
+ eval.evaluate(tuple);
+ if (inputVal.getLength() > 0) {
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ switch (typeTag) {
+ case INT8: {
+ metInt8s = true;
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT16: {
+ metInt16s = true;
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ metInt32s = true;
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ metInt64s = true;
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ metFloats = true;
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ metDoubles = true;
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case NULL: {
+ metNull = true;
+ break;
+ }
+ case SYSTEM_NULL: {
+ metSystemNull = true;
+ // For global aggregates simply ignore system null here,
+ // but if all input value are system null, then we should return
+ // null in finish().
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag);
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void finish() throws AlgebricksException {
+ try {
+ if (metNull) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ } else if (metDoubles) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+ aDouble.setValue(sum);
+ serde.serialize(aDouble, out);
+ } else if (metFloats) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
+ aFloat.setValue((float) sum);
+ serde.serialize(aFloat, out);
+ } else if (metInt64s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+ aInt64.setValue((long) sum);
+ serde.serialize(aInt64, out);
+ } else if (metInt32s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+ aInt32.setValue((int) sum);
+ serde.serialize(aInt32, out);
+ } else if (metInt16s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
+ aInt16.setValue((short) sum);
+ serde.serialize(aInt16, out);
+ } else if (metInt8s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
+ aInt8.setValue((byte) sum);
+ serde.serialize(aInt8, out);
+ } else if (metSystemNull) {
+ // All input values must have been of type system null.
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ } else {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ }
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index e28e59c..eab7ee8 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -45,11 +45,13 @@
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableCountAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableGlobalAvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.AvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.CountAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.GlobalAvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.LocalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.MaxAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.MinAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.SumAggregateDescriptor;
@@ -253,6 +255,7 @@
temp.add(LocalAvgAggregateDescriptor.FACTORY);
temp.add(GlobalAvgAggregateDescriptor.FACTORY);
temp.add(SumAggregateDescriptor.FACTORY);
+ temp.add(LocalSumAggregateDescriptor.FACTORY);
temp.add(MaxAggregateDescriptor.FACTORY);
temp.add(MinAggregateDescriptor.FACTORY);
@@ -262,6 +265,7 @@
temp.add(SerializableLocalAvgAggregateDescriptor.FACTORY);
temp.add(SerializableGlobalAvgAggregateDescriptor.FACTORY);
temp.add(SerializableSumAggregateDescriptor.FACTORY);
+ temp.add(SerializableLocalSumAggregateDescriptor.FACTORY);
// new functions - constructors
temp.add(ABooleanConstructorDescriptor.FACTORY);