Updated the serializable aggregate functions to include sql aggregate functions. Includes changes to make sure only one copy of the base aggregate function is being used.
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java
new file mode 100644
index 0000000..8e396ed
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractSerializableAvgAggregateFunction implements ICopySerializableAggregateFunction {
+
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private ICopyEvaluator eval;
+
+ private AMutableDouble aDouble = new AMutableDouble(0);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ADOUBLE);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ANULL);
+
+ public AbstractSerializableAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ eval = args[0].createEvaluator(inputVal);
+ }
+
+ @Override
+ public void init(DataOutput state) throws AlgebricksException {
+ try {
+ state.writeDouble(0.0);
+ state.writeLong(0);
+ state.writeBoolean(false);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ inputVal.reset();
+ eval.evaluate(tuple);
+ double sum = BufferSerDeUtil.getDouble(state, start);
+ long count = BufferSerDeUtil.getLong(state, start + 8);
+ if (inputVal.getLength() > 0) {
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ if (typeTag != ATypeTag.NULL) {
+ ++count;
+ }
+ switch (typeTag) {
+ case INT8: {
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT16: {
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case NULL: {
+ processNull(state, start);
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+ }
+ }
+ inputVal.reset();
+ }
+ BufferSerDeUtil.writeDouble(sum, state, start);
+ BufferSerDeUtil.writeLong(count, state, start + 8);
+ }
+
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ double sum = BufferSerDeUtil.getDouble(state, start);
+ long count = BufferSerDeUtil.getLong(state, start + 8);
+ boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
+
+ if (count == 0) {
+ GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
+ } else {
+ try {
+ if (metNull)
+ nullSerde.serialize(ANull.NULL, result);
+ else {
+ aDouble.setValue(sum / count);
+ doubleSerde.serialize(aDouble, result);
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+ }
+
+ @Override
+ public void finishPartial(byte[] data, int start, int len, DataOutput partialResult) throws AlgebricksException {
+ try {
+ partialResult.write(data, start, len);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ protected void processNull(byte[] state, int start) {
+ boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
+ metNull = true;
+ BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java
new file mode 100644
index 0000000..9388671
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * count(NULL) returns NULL.
+ */
+public abstract class AbstractSerializableCountAggregateFunction implements ICopySerializableAggregateFunction {
+ private AMutableInt64 result = new AMutableInt64(-1);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ANULL);
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private ICopyEvaluator eval;
+
+ public AbstractSerializableCountAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ eval = args[0].createEvaluator(inputVal);
+ }
+
+ @Override
+ public void init(DataOutput state) throws AlgebricksException {
+ try {
+ state.writeBoolean(false);
+ state.writeLong(0);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ boolean metNull = BufferSerDeUtil.getBoolean(state, start);
+ long cnt = BufferSerDeUtil.getLong(state, start + 1);
+ inputVal.reset();
+ eval.evaluate(tuple);
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ if (typeTag == ATypeTag.NULL) {
+ processNull(state, start);
+ } else {
+ cnt++;
+ }
+ BufferSerDeUtil.writeBoolean(metNull, state, start);
+ BufferSerDeUtil.writeLong(cnt, state, start + 1);
+ }
+
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+ boolean metNull = BufferSerDeUtil.getBoolean(state, start);
+ long cnt = BufferSerDeUtil.getLong(state, start + 1);
+ try {
+ if (metNull) {
+ nullSerde.serialize(ANull.NULL, out);
+ } else {
+ result.setValue(cnt);
+ int64Serde.serialize(result, out);
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+ finish(state, start, len, out);
+ }
+
+ protected void processNull(byte[] state, int start) {
+ boolean metNull = BufferSerDeUtil.getBoolean(state, start);
+ metNull = true;
+ BufferSerDeUtil.writeBoolean(metNull, state, start);
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableGlobalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableGlobalAvgAggregateFunction.java
new file mode 100644
index 0000000..597f50b
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableGlobalAvgAggregateFunction.java
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
+import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractSerializableGlobalAvgAggregateFunction implements ICopySerializableAggregateFunction {
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private ICopyEvaluator eval;
+ private AMutableDouble aDouble = new AMutableDouble(0);
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private final boolean isLocalAgg;
+
+ private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
+ private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
+ private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
+ private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
+ private DataOutput countBytesOutput = new DataOutputStream(countBytes);
+ private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
+ private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
+ private ClosedRecordConstructorEval recordEval;
+
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ADOUBLE);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ANULL);
+
+ public AbstractSerializableGlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
+ throws AlgebricksException {
+ eval = args[0].createEvaluator(inputVal);
+ this.isLocalAgg = isLocalAgg;
+ }
+
+ @Override
+ public void init(DataOutput state) throws AlgebricksException {
+ try {
+ state.writeDouble(0.0);
+ state.writeLong(0);
+ state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ double sum = BufferSerDeUtil.getDouble(state, start);
+ long count = BufferSerDeUtil.getLong(state, start + 8);
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
+ inputVal.reset();
+ eval.evaluate(tuple);
+ byte[] serBytes = inputVal.getByteArray();
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+ if (typeTag == ATypeTag.NULL) {
+ processNull(state, start + 16);
+ return;
+ } else if (aggType == ATypeTag.NULL) {
+ return;
+ } else if (typeTag == ATypeTag.RECORD) {
+ // Global aggregate
+ if (isLocalAgg) {
+ throw new AlgebricksException("Record type can not be processed by in a local-avg operation.");
+ } else if (typeTag == ATypeTag.SYSTEM_NULL) {
+ // ignore
+ return;
+ }
+ } else if (aggType == ATypeTag.SYSTEM_NULL) {
+ aggType = typeTag;
+ } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+ throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+ + aggType + ".");
+ } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+ aggType = typeTag;
+ }
+
+ if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != ATypeTag.RECORD) {
+ ++count;
+ }
+
+ switch (typeTag) {
+ case INT8: {
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT16: {
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case NULL: {
+ break;
+ }
+ case SYSTEM_NULL: {
+ if (isLocalAgg) {
+ throw new AlgebricksException("SYSTEM_NULL can not be processed by in a local-avg operation.");
+ }
+ break;
+ }
+ case RECORD: {
+ int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, 1, true);
+ if (offset1 == 0) { // the sum is null
+ aggType = ATypeTag.NULL;
+ state[start + 16] = aggType.serialize();
+ } else {
+ sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
+ }
+ int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, 1, true);
+ if (offset2 != 0) { // the count is not null
+ count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
+ }
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+ }
+ }
+ BufferSerDeUtil.writeDouble(sum, state, start);
+ BufferSerDeUtil.writeLong(count, state, start + 8);
+ }
+
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ double sum = BufferSerDeUtil.getDouble(state, start);
+ long count = BufferSerDeUtil.getLong(state, start + 8);
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
+ if (isLocalAgg) {
+ if (recordEval == null) {
+ List<IAType> unionList = new ArrayList<IAType>();
+ unionList.add(BuiltinType.ANULL);
+ unionList.add(BuiltinType.ADOUBLE);
+ ARecordType tmpRecType;
+ try {
+ tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
+ new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
+ } catch (AsterixException e) {
+ throw new AlgebricksException(e);
+ }
+
+ ARecordType recType = tmpRecType;
+ recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum, evalCount },
+ avgBytes, result);
+ }
+ try {
+ if (count == 0 && aggType != ATypeTag.NULL) {
+ result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ return;
+ }
+ if (aggType == ATypeTag.NULL) {
+ sumBytes.reset();
+ nullSerde.serialize(ANull.NULL, sumBytesOutput);
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(sum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
+ }
+ countBytes.reset();
+ aInt64.setValue(count);
+ int64Serde.serialize(aInt64, countBytesOutput);
+ recordEval.evaluate(null);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ } else {
+ try {
+ if (count == 0 || aggType == ATypeTag.NULL)
+ nullSerde.serialize(ANull.NULL, result);
+ else {
+ aDouble.setValue(sum / count);
+ doubleSerde.serialize(aDouble, result);
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ if (isLocalAgg) {
+ finish(state, start, len, result);
+ } else {
+ double globalSum = BufferSerDeUtil.getDouble(state, start);
+ long globalCount = BufferSerDeUtil.getLong(state, start + 8);
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
+
+ if (recordEval == null) {
+ List<IAType> unionList = new ArrayList<IAType>();
+ unionList.add(BuiltinType.ANULL);
+ unionList.add(BuiltinType.ADOUBLE);
+ ARecordType _recType;
+ try {
+ _recType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
+ new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
+ } catch (AsterixException e) {
+ throw new AlgebricksException(e);
+ }
+
+ ARecordType recType = _recType;
+ recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum, evalCount },
+ avgBytes, result);
+ }
+ try {
+ if (globalCount == 0 || aggType == ATypeTag.NULL) {
+ sumBytes.reset();
+ nullSerde.serialize(ANull.NULL, sumBytesOutput);
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(globalSum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
+ }
+ countBytes.reset();
+ aInt64.setValue(globalCount);
+ int64Serde.serialize(aInt64, countBytesOutput);
+ recordEval.evaluate(null);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+ }
+
+ protected void processNull(byte[] state, int start) {
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
+ aggType = ATypeTag.NULL;
+ state[start] = aggType.serialize();
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
new file mode 100644
index 0000000..5ef6f23
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class AbstractSerializableSumAggregateFunction implements ICopySerializableAggregateFunction {
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private ICopyEvaluator eval;
+ private AMutableDouble aDouble = new AMutableDouble(0);
+ private AMutableFloat aFloat = new AMutableFloat(0);
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private AMutableInt32 aInt32 = new AMutableInt32(0);
+ private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+ private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+ @SuppressWarnings("rawtypes")
+ private ISerializerDeserializer serde;
+ private final boolean isLocalAgg;
+
+ public AbstractSerializableSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
+ throws AlgebricksException {
+ eval = args[0].createEvaluator(inputVal);
+ this.isLocalAgg = isLocalAgg;
+ }
+
+ @Override
+ public void init(DataOutput state) throws AlgebricksException {
+ try {
+ state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ state.writeDouble(0.0);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
+ double sum = BufferSerDeUtil.getDouble(state, start + 1);
+ inputVal.reset();
+ eval.evaluate(tuple);
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ if (typeTag == ATypeTag.NULL) {
+ processNull(state, start);
+ return;
+ } else if (aggType == ATypeTag.NULL) {
+ return;
+ } else if (aggType == ATypeTag.SYSTEM_NULL) {
+ aggType = typeTag;
+ } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
+ throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+ + aggType + ".");
+ }
+ switch (typeTag) {
+ case INT8: {
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT16: {
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case NULL: {
+ aggType = typeTag;
+ break;
+ }
+ case SYSTEM_NULL: {
+ // For global aggregates simply ignore system null here,
+ // but if all input value are system null, then we should return
+ // null in finish().
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
+ }
+ }
+ state[start] = aggType.serialize();
+ BufferSerDeUtil.writeDouble(sum, state, start + 1);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
+ double sum = BufferSerDeUtil.getDouble(state, start + 1);
+ try {
+ switch (aggType) {
+ case INT8: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
+ aInt8.setValue((byte) sum);
+ serde.serialize(aInt8, out);
+ break;
+ }
+ case INT16: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
+ aInt16.setValue((short) sum);
+ serde.serialize(aInt16, out);
+ break;
+ }
+ case INT32: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+ aInt32.setValue((int) sum);
+ serde.serialize(aInt32, out);
+ break;
+ }
+ case INT64: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+ aInt64.setValue((long) sum);
+ serde.serialize(aInt64, out);
+ break;
+ }
+ case FLOAT: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
+ aFloat.setValue((float) sum);
+ serde.serialize(aFloat, out);
+ break;
+ }
+ case DOUBLE: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+ aDouble.setValue(sum);
+ serde.serialize(aDouble, out);
+ break;
+ }
+ case NULL: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ break;
+ }
+ case SYSTEM_NULL: {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ }
+ break;
+ }
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+ finish(state, start, len, out);
+ }
+
+ protected void processNull(byte[] state, int start) {
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
+ aggType = ATypeTag.NULL;
+ state[start] = aggType.serialize();
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java
new file mode 100644
index 0000000..4f66c54
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+public class SerializableAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+ public SerializableAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ super(args);
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateFunction.java
new file mode 100644
index 0000000..7a6aacd
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+/**
+ * count(NULL) returns NULL.
+ */
+public class SerializableCountAggregateFunction extends AbstractSerializableCountAggregateFunction {
+ public SerializableCountAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ super(args);
+ }
+}
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java
new file mode 100644
index 0000000..1a69283
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+public class SerializableGlobalAvgAggregateFunction extends AbstractSerializableGlobalAvgAggregateFunction {
+
+ public SerializableGlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg) throws AlgebricksException {
+ super(args, isLocalAgg);
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..0821ea0
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+public class SerializableGlobalSqlAvgAggregateFunction extends AbstractSerializableGlobalAvgAggregateFunction {
+
+ public SerializableGlobalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg) throws AlgebricksException {
+ super(args, isLocalAgg);
+ }
+
+ @Override
+ protected void processNull(byte[] state, int start) {
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
new file mode 100644
index 0000000..aed5de2
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+public class SerializableLocalAvgAggregateFunction extends AbstractSerializableGlobalAvgAggregateFunction {
+
+ public SerializableLocalAvgAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg) throws AlgebricksException {
+ super(args, isLocalAgg);
+ }
+
+ @Override
+ protected void processNull(byte[] state, int start) {
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..94c2dd4
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+public class SerializableLocalSqlAvgAggregateFunction extends AbstractSerializableGlobalAvgAggregateFunction {
+
+ public SerializableLocalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg) throws AlgebricksException {
+ super(args, isLocalAgg);
+ }
+
+ @Override
+ protected void processNull(byte[] state, int start) {
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..b922308
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+public class SerializableSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+ public SerializableSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ super(args);
+ }
+
+ @Override
+ protected void processNull(byte[] state, int start) {
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java
new file mode 100644
index 0000000..525b643
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+/**
+ * count(NULL) returns NULL.
+ */
+public class SerializableSqlCountAggregateFunction extends AbstractSerializableCountAggregateFunction {
+ public SerializableSqlCountAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ super(args);
+ }
+
+ @Override
+ protected void processNull(byte[] state, int start) {
+ }
+}
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
new file mode 100644
index 0000000..9d6e46e
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+public class SerializableSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+
+ public SerializableSqlSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
+ throws AlgebricksException {
+ super(args, isLocalAgg);
+ }
+
+ @Override
+ protected void processNull(byte[] state, int start) {
+ }
+
+}