Finished SQL-AVG for aggregate and scalar cases.
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
index d322b98..5d93018 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
@@ -65,7 +65,7 @@
@Override
public FunctionIdentifier getIdentifier() {
- return AsterixBuiltinFunctions.SERIAL_GLOBAL_AVG;
+ return AsterixBuiltinFunctions.SERIAL_GLOBAL_SQL_AVG;
}
@Override
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
index f65b27d..6c8b6e0 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
@@ -69,7 +69,7 @@
@Override
public FunctionIdentifier getIdentifier() {
- return AsterixBuiltinFunctions.SERIAL_LOCAL_AVG;
+ return AsterixBuiltinFunctions.SERIAL_LOCAL_SQL_AVG;
}
@Override
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
index 7b2b18d..dc91711 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
@@ -27,7 +27,7 @@
public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
- private final static FunctionIdentifier FID = AsterixBuiltinFunctions.SERIAL_LOCAL_SUM;
+ private final static FunctionIdentifier FID = AsterixBuiltinFunctions.SERIAL_LOCAL_SQL_SUM;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
public IFunctionDescriptor createFunctionDescriptor() {
return new SerializableLocalSqlSumAggregateDescriptor();
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..0ed0f13
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new SerializableSqlAvgAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.SERIAL_SQL_AVG;
+ }
+
+ @Override
+ public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+ ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ final ICopyEvaluatorFactory[] evals = args;
+
+ return new ICopySerializableAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+ return new ICopySerializableAggregateFunction() {
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
+
+ private AMutableDouble aDouble = new AMutableDouble(0);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ADOUBLE);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ANULL);
+
+ @Override
+ public void init(DataOutput state) throws AlgebricksException {
+ try {
+ state.writeDouble(0.0);
+ state.writeLong(0);
+ state.writeBoolean(false);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
+ throws AlgebricksException {
+ inputVal.reset();
+ eval.evaluate(tuple);
+ double sum = BufferSerDeUtil.getDouble(state, start);
+ long count = BufferSerDeUtil.getLong(state, start + 8);
+ boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
+ if (inputVal.getLength() > 0) {
+ ++count;
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
+ .getByteArray()[0]);
+ switch (typeTag) {
+ case INT8: {
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT16: {
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case NULL: {
+ metNull = true;
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute AVG for values of type "
+ + typeTag);
+ }
+ }
+ inputVal.reset();
+ }
+ BufferSerDeUtil.writeDouble(sum, state, start);
+ BufferSerDeUtil.writeLong(count, state, start + 8);
+ BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
+ }
+
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ double sum = BufferSerDeUtil.getDouble(state, start);
+ long count = BufferSerDeUtil.getLong(state, start + 8);
+ boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
+
+ if (count == 0) {
+ GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
+ } else {
+ try {
+ if (metNull)
+ nullSerde.serialize(ANull.NULL, result);
+ else {
+ aDouble.setValue(sum / count);
+ doubleSerde.serialize(aDouble, result);
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+ }
+
+ @Override
+ public void finishPartial(byte[] data, int start, int len, DataOutput partialResult)
+ throws AlgebricksException {
+ try {
+ partialResult.write(data, start, len);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ };
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
index 4c9dd88..3a08642 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
@@ -35,7 +35,7 @@
@Override
public FunctionIdentifier getIdentifier() {
- return AsterixBuiltinFunctions.SERIAL_SUM;
+ return AsterixBuiltinFunctions.SERIAL_SQL_SUM;
}
@Override
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
index 30a0f71..d322669 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
@@ -128,7 +128,7 @@
byte[] serBytes = inputVal.getByteArray();
ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
if (typeTag == ATypeTag.NULL) {
- aggType = ATypeTag.NULL;
+ processNull();
return;
} else if (aggType == ATypeTag.NULL) {
return;
@@ -280,4 +280,8 @@
}
}
}
+
+ protected void processNull() {
+ aggType = ATypeTag.NULL;
+ }
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractGlobalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractGlobalAvgAggregateFunction.java
index 253be6d..88b0ebf 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractGlobalAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractGlobalAvgAggregateFunction.java
@@ -20,9 +20,14 @@
import java.util.ArrayList;
import java.util.List;
+import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import edu.uci.ics.asterix.om.base.ADouble;
@@ -36,9 +41,11 @@
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
@@ -54,10 +61,12 @@
private DataOutput out;
private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
private ICopyEvaluator eval;
- private double globalSum;
- private long globalCount;
+ private ATypeTag aggType;
+ private double sum;
+ private long count;
private AMutableDouble aDouble = new AMutableDouble(0);
private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private final boolean isLocalAgg;
private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
@@ -69,28 +78,33 @@
private ClosedRecordConstructorEval recordEval;
@SuppressWarnings("unchecked")
- private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- @SuppressWarnings("unchecked")
private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ADOUBLE);
@SuppressWarnings("unchecked")
+ private ISerializerDeserializer<AInt64> intSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+ @SuppressWarnings("unchecked")
private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ANULL);
- private boolean metNull;
- public AbstractGlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
- throws AlgebricksException {
+ public AbstractGlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output,
+ boolean isLocalAgg) throws AlgebricksException {
eval = args[0].createEvaluator(inputVal);
out = output.getDataOutput();
+ this.isLocalAgg = isLocalAgg;
List<IAType> unionList = new ArrayList<IAType>();
unionList.add(BuiltinType.ANULL);
unionList.add(BuiltinType.ADOUBLE);
ARecordType tmpRecType;
try {
- tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
- new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
+ if (isLocalAgg) {
+ tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
+ new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, false);
+ } else {
+ tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
+ new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
+ }
} catch (AsterixException e) {
throw new AlgebricksException(e);
}
@@ -102,9 +116,9 @@
@Override
public void init() {
- globalSum = 0.0;
- globalCount = 0;
- metNull = false;
+ aggType = ATypeTag.SYSTEM_NULL;
+ sum = 0.0;
+ count = 0;
}
@Override
@@ -113,50 +127,119 @@
eval.evaluate(tuple);
byte[] serBytes = inputVal.getByteArray();
ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
- switch (typeTag) {
- case NULL: {
- metNull = true;
- break;
- }
- case SYSTEM_NULL: {
- // Ignore and return.
+ if (typeTag == ATypeTag.NULL) {
+ aggType = ATypeTag.NULL;
+ return;
+ } else if (aggType == ATypeTag.NULL) {
+ return;
+ } else if (!isLocalAgg && typeTag == ATypeTag.RECORD) {
+ if (aggType == ATypeTag.SYSTEM_NULL) {
return;
}
+ } else if (aggType == ATypeTag.SYSTEM_NULL) {
+ aggType = typeTag;
+ } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+ throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+ + aggType + ".");
+ } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+ aggType = typeTag;
+ }
+
+ if (typeTag != ATypeTag.SYSTEM_NULL) {
+ ++count;
+ }
+
+ switch (typeTag) {
+ case INT8: {
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT16: {
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case NULL: {
+ break;
+ }
case RECORD: {
- // Expected.
+ // Expected for global aggregate.
+ if (isLocalAgg) {
+ throw new AlgebricksException("Global-Avg is not defined for values of type "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+ } else {
+ // The record length helps us determine whether the input record fields are nullable.
+ int recordLength = ARecordSerializerDeserializer.getRecordLength(serBytes, 1);
+ int nullBitmapSize = 1;
+ if (recordLength == 29) {
+ nullBitmapSize = 0;
+ }
+ int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, nullBitmapSize, false);
+ if (offset1 == 0) // the sum is null
+ aggType = ATypeTag.NULL;
+ else
+ sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
+ int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, nullBitmapSize, false);
+ if (offset2 != 0) // the count is not null
+ count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
+ }
break;
}
default: {
- throw new AlgebricksException("Global-Avg is not defined for values of type "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+ throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
}
}
-
- // The record length helps us determine whether the input record fields are nullable.
- int recordLength = ARecordSerializerDeserializer.getRecordLength(serBytes, 1);
- int nullBitmapSize = 1;
- if (recordLength == 29) {
- nullBitmapSize = 0;
- }
- int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, nullBitmapSize, false);
- if (offset1 == 0) // the sum is null
- metNull = true;
- else
- globalSum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
- int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, nullBitmapSize, false);
- if (offset2 != 0) // the count is not null
- globalCount += AInt64SerializerDeserializer.getLong(serBytes, offset2);
-
+ inputVal.reset();
}
@Override
public void finish() throws AlgebricksException {
try {
- if (globalCount == 0 || metNull)
- nullSerde.serialize(ANull.NULL, out);
- else {
- aDouble.setValue(globalSum / globalCount);
- doubleSerde.serialize(aDouble, out);
+ if (isLocalAgg) {
+ if (count == 0 && aggType != ATypeTag.NULL) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ return;
+ }
+ if (aggType == ATypeTag.NULL) {
+ sumBytes.reset();
+ nullSerde.serialize(ANull.NULL, sumBytesOutput);
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(sum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
+ }
+ countBytes.reset();
+ aInt64.setValue(count);
+ intSerde.serialize(aInt64, countBytesOutput);
+ recordEval.evaluate(null);
+ } else {
+ if (count == 0 || aggType == ATypeTag.NULL) {
+ nullSerde.serialize(ANull.NULL, out);
+ } else {
+ aDouble.setValue(sum / count);
+ doubleSerde.serialize(aDouble, out);
+ }
}
} catch (IOException e) {
throw new AlgebricksException(e);
@@ -165,22 +248,31 @@
@Override
public void finishPartial() throws AlgebricksException {
- try {
- if (metNull || globalCount == 0) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
+ if (isLocalAgg) {
+ finish();
+ } else {
+ if (count == 0) {
+ if (GlobalConfig.DEBUG) {
+ GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+ }
} else {
- sumBytes.reset();
- aDouble.setValue(globalSum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
+ try {
+ if (count == 0 || aggType == ATypeTag.NULL) {
+ sumBytes.reset();
+ nullSerde.serialize(ANull.NULL, sumBytesOutput);
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(sum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
+ }
+ countBytes.reset();
+ aInt64.setValue(count);
+ intSerde.serialize(aInt64, countBytesOutput);
+ recordEval.evaluate(null);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
}
- countBytes.reset();
- aInt64.setValue(globalCount);
- longSerde.serialize(aInt64, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
}
}
-
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractLocalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractLocalAvgAggregateFunction.java
index e950ae7..5be48de 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractLocalAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractLocalAvgAggregateFunction.java
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;
+import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
@@ -27,6 +28,7 @@
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import edu.uci.ics.asterix.om.base.ADouble;
import edu.uci.ics.asterix.om.base.AInt64;
@@ -62,6 +64,9 @@
private ATypeTag aggType;
private double sum;
private long count;
+ private AMutableDouble aDouble = new AMutableDouble(0);
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private final boolean isLocalAgg;
private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
@@ -71,30 +76,36 @@
private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
private ClosedRecordConstructorEval recordEval;
+
@SuppressWarnings("unchecked")
private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ADOUBLE);
@SuppressWarnings("unchecked")
- private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+ private ISerializerDeserializer<AInt64> intSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.AINT64);
@SuppressWarnings("unchecked")
private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ANULL);
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private boolean metNull;
- public AbstractLocalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
- throws AlgebricksException {
+ public AbstractLocalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output,
+ boolean isLocalAgg) throws AlgebricksException {
eval = args[0].createEvaluator(inputVal);
out = output.getDataOutput();
+ this.isLocalAgg = isLocalAgg;
List<IAType> unionList = new ArrayList<IAType>();
unionList.add(BuiltinType.ANULL);
unionList.add(BuiltinType.ADOUBLE);
ARecordType tmpRecType;
try {
- tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
- new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, false);
+ if (isLocalAgg) {
+ tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
+ new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, false);
+ } else {
+ tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
+ new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
+ }
} catch (AsterixException e) {
throw new AlgebricksException(e);
}
@@ -109,89 +120,137 @@
aggType = ATypeTag.SYSTEM_NULL;
sum = 0.0;
count = 0;
+ metNull = false;
}
@Override
public void step(IFrameTupleReference tuple) throws AlgebricksException {
inputVal.reset();
eval.evaluate(tuple);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
- if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
- aggType = ATypeTag.NULL;
- return;
- } else if (aggType == ATypeTag.SYSTEM_NULL) {
- aggType = typeTag;
- } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
- throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
- + aggType + ".");
- } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
- aggType = typeTag;
- }
+ byte[] serBytes = inputVal.getByteArray();
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+ if (isLocalAgg) {
+ if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+ aggType = ATypeTag.NULL;
+ return;
+ } else if (aggType == ATypeTag.SYSTEM_NULL) {
+ aggType = typeTag;
+ } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+ throw new AlgebricksException("Unexpected type " + typeTag
+ + " in aggregation input stream. Expected type " + aggType + ".");
+ } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+ aggType = typeTag;
+ }
- if (typeTag != ATypeTag.SYSTEM_NULL) {
- ++count;
- }
+ if (typeTag != ATypeTag.SYSTEM_NULL) {
+ ++count;
+ }
- switch (typeTag) {
- case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
+ switch (typeTag) {
+ case INT8: {
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT16: {
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case NULL: {
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute LOCAL-AVG for values of type " + typeTag);
+ }
}
- case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
+ inputVal.reset();
+
+ } else {
+ switch (typeTag) {
+ case NULL: {
+ metNull = true;
+ break;
+ }
+ case SYSTEM_NULL: {
+ // Ignore and return.
+ return;
+ }
+ case RECORD: {
+ // Expected.
+ break;
+ }
+ default: {
+ throw new AlgebricksException("Global-Avg is not defined for values of type "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+ }
}
- case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
+
+ // The record length helps us determine whether the input record fields are nullable.
+ int recordLength = ARecordSerializerDeserializer.getRecordLength(serBytes, 1);
+ int nullBitmapSize = 1;
+ if (recordLength == 29) {
+ nullBitmapSize = 0;
}
- case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute LOCAL-AVG for values of type " + typeTag);
- }
+ int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, nullBitmapSize, false);
+ if (offset1 == 0) // the sum is null
+ metNull = true;
+ else
+ sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
+ int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, nullBitmapSize, false);
+ if (offset2 != 0) // the count is not null
+ count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
}
- inputVal.reset();
}
@Override
public void finish() throws AlgebricksException {
try {
- if (count == 0 && aggType != ATypeTag.NULL) {
- out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- return;
- }
- if (aggType == ATypeTag.NULL) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
+ if (isLocalAgg) {
+ if (count == 0 && aggType != ATypeTag.NULL) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ return;
+ }
+ if (aggType == ATypeTag.NULL) {
+ sumBytes.reset();
+ nullSerde.serialize(ANull.NULL, sumBytesOutput);
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(sum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
+ }
+ countBytes.reset();
+ aInt64.setValue(count);
+ intSerde.serialize(aInt64, countBytesOutput);
+ recordEval.evaluate(null);
} else {
- sumBytes.reset();
- aDouble.setValue(sum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
+ if (count == 0 || metNull) {
+ nullSerde.serialize(ANull.NULL, out);
+ } else {
+ aDouble.setValue(sum / count);
+ doubleSerde.serialize(aDouble, out);
+ }
}
- countBytes.reset();
- aInt64.setValue(count);
- longSerde.serialize(aInt64, countBytesOutput);
- recordEval.evaluate(null);
} catch (IOException e) {
throw new AlgebricksException(e);
}
@@ -199,6 +258,31 @@
@Override
public void finishPartial() throws AlgebricksException {
- finish();
+ if (isLocalAgg) {
+ finish();
+ } else {
+ if (count == 0) {
+ if (GlobalConfig.DEBUG) {
+ GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+ }
+ } else {
+ try {
+ if (metNull || count == 0) {
+ sumBytes.reset();
+ nullSerde.serialize(ANull.NULL, sumBytesOutput);
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(sum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
+ }
+ countBytes.reset();
+ aInt64.setValue(count);
+ intSerde.serialize(aInt64, countBytesOutput);
+ recordEval.evaluate(null);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+ }
}
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..0808e81
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class GlobalSqlAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new GlobalSqlAvgAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.GLOBAL_SQL_AVG;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new GlobalSqlAvgAggregateFunction(args, provider);
+ }
+ };
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..4619c4e
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class GlobalSqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+ public GlobalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+ throws AlgebricksException {
+ super(args, output, false);
+ }
+
+ @Override
+ protected void processNull() {
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..6a1f60e
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new LocalSqlAvgAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.LOCAL_SQL_AVG;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new LocalSqlAvgAggregateFunction(args, provider);
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..0fca8a9
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+ public LocalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+ throws AlgebricksException {
+ super(args, output, true);
+ }
+
+ @Override
+ protected void processNull() {
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..cf3f3dd
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new SqlAvgAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.SQL_AVG;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new SqlAvgAggregateFunction(args, provider);
+ }
+ };
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java
new file mode 100644
index 0000000..08d6353
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+ public SqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
+ super(args, output, false);
+ }
+
+ @Override
+ protected void processNull() {
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index 7801b53..5ae2459 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -79,21 +79,25 @@
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSqlAvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSqlSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSumAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSqlAvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSqlCountAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSqlSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.AvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.CountAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.GlobalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.GlobalSqlAvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.LocalAvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.LocalMaxAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.LocalMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlAvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlMaxAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlMinAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.LocalSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.MaxAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.MinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlAvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.SqlCountAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.SqlMaxAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.SqlMinAggregateDescriptor;
@@ -445,9 +449,9 @@
// SQL aggregates
temp.add(SqlCountAggregateDescriptor.FACTORY);
-// temp.add(SqlAvgAggregateDescriptor.FACTORY);
-// temp.add(LocalSqlAvgAggregateDescriptor.FACTORY);
-// temp.add(GlobalSqlAvgAggregateDescriptor.FACTORY);
+ temp.add(SqlAvgAggregateDescriptor.FACTORY);
+ temp.add(LocalSqlAvgAggregateDescriptor.FACTORY);
+ temp.add(GlobalSqlAvgAggregateDescriptor.FACTORY);
temp.add(SqlSumAggregateDescriptor.FACTORY);
temp.add(LocalSqlSumAggregateDescriptor.FACTORY);
temp.add(SqlMaxAggregateDescriptor.FACTORY);
@@ -457,7 +461,7 @@
// SQL serializable aggregates
temp.add(SerializableSqlCountAggregateDescriptor.FACTORY);
-// temp.add(SerializableSqlAvgAggregateDescriptor.FACTORY);
+ temp.add(SerializableSqlAvgAggregateDescriptor.FACTORY);
temp.add(SerializableLocalSqlAvgAggregateDescriptor.FACTORY);
temp.add(SerializableGlobalSqlAvgAggregateDescriptor.FACTORY);
temp.add(SerializableSqlSumAggregateDescriptor.FACTORY);