Updated the serializable aggregate functions to include sql aggregate functions. Includes changes to make sure only one copy of the base aggregate function is being used.
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java
index 72e0045..a215a07 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java
@@ -14,37 +14,15 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
@@ -62,129 +40,15 @@
@Override
public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- ICopyEvaluatorFactory[] args) throws AlgebricksException {
- final ICopyEvaluatorFactory[] evals = args;
-
+ final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopySerializableAggregateFunctionFactory() {
private static final long serialVersionUID = 1L;
+ @Override
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new ICopySerializableAggregateFunction() {
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
-
- private AMutableDouble aDouble = new AMutableDouble(0);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
-
- @Override
- public void init(DataOutput state) throws AlgebricksException {
- try {
- state.writeDouble(0.0);
- state.writeLong(0);
- state.writeBoolean(false);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
- throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- double sum = BufferSerDeUtil.getDouble(state, start);
- long count = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
- if (inputVal.getLength() > 0) {
- ++count;
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
- .getByteArray()[0]);
- switch (typeTag) {
- case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- metNull = true;
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute AVG for values of type "
- + typeTag);
- }
- }
- inputVal.reset();
- }
- BufferSerDeUtil.writeDouble(sum, state, start);
- BufferSerDeUtil.writeLong(count, state, start + 8);
- BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
- }
-
- @Override
- public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
- double sum = BufferSerDeUtil.getDouble(state, start);
- long count = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
- if (count == 0) {
- GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
- } else {
- try {
- if (metNull)
- nullSerde.serialize(ANull.NULL, result);
- else {
- aDouble.setValue(sum / count);
- doubleSerde.serialize(aDouble, result);
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
- }
-
- @Override
- public void finishPartial(byte[] data, int start, int len, DataOutput partialResult)
- throws AlgebricksException {
- try {
- partialResult.write(data, start, len);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- };
+ return new SerializableAvgAggregateFunction(args);
}
};
}
+
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
index a9b264c..0c9ce06 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
@@ -14,29 +14,15 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
/**
* count(NULL) returns NULL.
@@ -63,68 +49,7 @@
@Override
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
- return new ICopySerializableAggregateFunction() {
- private AMutableInt64 result = new AMutableInt64(-1);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-
- @Override
- public void init(DataOutput state) throws AlgebricksException {
- try {
- state.writeBoolean(false);
- state.writeLong(0);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
- throws AlgebricksException {
- boolean metNull = BufferSerDeUtil.getBoolean(state, start);
- long cnt = BufferSerDeUtil.getLong(state, start + 1);
- inputVal.reset();
- eval.evaluate(tuple);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
- .deserialize(inputVal.getByteArray()[0]);
- if (typeTag == ATypeTag.NULL) {
- metNull = true;
- } else {
- cnt++;
- }
- BufferSerDeUtil.writeBoolean(metNull, state, start);
- BufferSerDeUtil.writeLong(cnt, state, start + 1);
- }
-
- @Override
- public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
- boolean metNull = BufferSerDeUtil.getBoolean(state, start);
- long cnt = BufferSerDeUtil.getLong(state, start + 1);
- try {
- if (metNull) {
- nullSerde.serialize(ANull.NULL, out);
- } else {
- result.setValue(cnt);
- int64Serde.serialize(result, out);
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void finishPartial(byte[] state, int start, int len, DataOutput out)
- throws AlgebricksException {
- finish(state, start, len, out);
- }
- };
+ return new SerializableCountAggregateFunction(args);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
index f720434..182214a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
@@ -15,44 +15,15 @@
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableGlobalAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
@@ -70,154 +41,13 @@
@Override
public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- ICopyEvaluatorFactory[] args) throws AlgebricksException {
- final ICopyEvaluatorFactory[] evals = args;
- List<IAType> unionList = new ArrayList<IAType>();
- unionList.add(BuiltinType.ANULL);
- unionList.add(BuiltinType.ADOUBLE);
- ARecordType _recType;
- try {
- _recType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
- new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
- } catch (AsterixException e) {
- throw new AlgebricksException(e);
- }
-
- final ARecordType recType = _recType;
-
+ final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopySerializableAggregateFunctionFactory() {
private static final long serialVersionUID = 1L;
@Override
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
- return new ICopySerializableAggregateFunction() {
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
-
- private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
- private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
- private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput countBytesOutput = new DataOutputStream(countBytes);
- private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
- private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
- private ClosedRecordConstructorEval recordEval;
-
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
-
- @Override
- public void init(DataOutput state) throws AlgebricksException {
- try {
- state.writeDouble(0.0);
- state.writeLong(0);
- state.writeBoolean(false);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
- throws AlgebricksException {
- double globalSum = BufferSerDeUtil.getDouble(state, start);
- long globalCount = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
- inputVal.reset();
- eval.evaluate(tuple);
- byte[] serBytes = inputVal.getByteArray();
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
- switch (typeTag) {
- case NULL: {
- metNull = true;
- break;
- }
- case SYSTEM_NULL: {
- // Ignore and return.
- return;
- }
- case RECORD: {
- // Expected.
- break;
- }
- default: {
- throw new AlgebricksException("Global-Avg is not defined for values of type "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
- }
- }
- int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, 1, true);
- if (offset1 == 0) // the sum is null
- metNull = true;
- else
- globalSum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
- int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, 1, true);
- if (offset2 != 0) // the count is not null
- globalCount += AInt64SerializerDeserializer.getLong(serBytes, offset2);
-
- BufferSerDeUtil.writeDouble(globalSum, state, start);
- BufferSerDeUtil.writeLong(globalCount, state, start + 8);
- BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
-
- }
-
- @Override
- public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
- double globalSum = BufferSerDeUtil.getDouble(state, start);
- long globalCount = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
- try {
- if (globalCount == 0 || metNull)
- nullSerde.serialize(ANull.NULL, result);
- else {
- aDouble.setValue(globalSum / globalCount);
- doubleSerde.serialize(aDouble, result);
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void finishPartial(byte[] state, int start, int len, DataOutput result)
- throws AlgebricksException {
- double globalSum = BufferSerDeUtil.getDouble(state, start);
- long globalCount = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
- if (recordEval == null)
- recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
- evalCount }, avgBytes, result);
-
- try {
- if (globalCount == 0 || metNull) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(globalSum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt64.setValue(globalCount);
- longSerde.serialize(aInt64, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
- };
+ return new SerializableGlobalAvgAggregateFunction(args, false);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
index 5d93018..5314ebe 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
@@ -15,44 +15,15 @@
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableGlobalSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
@@ -70,154 +41,13 @@
@Override
public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- ICopyEvaluatorFactory[] args) throws AlgebricksException {
- final ICopyEvaluatorFactory[] evals = args;
- List<IAType> unionList = new ArrayList<IAType>();
- unionList.add(BuiltinType.ANULL);
- unionList.add(BuiltinType.ADOUBLE);
- ARecordType _recType;
- try {
- _recType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
- new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
- } catch (AsterixException e) {
- throw new AlgebricksException(e);
- }
-
- final ARecordType recType = _recType;
-
+ final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopySerializableAggregateFunctionFactory() {
private static final long serialVersionUID = 1L;
@Override
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
- return new ICopySerializableAggregateFunction() {
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
-
- private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
- private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
- private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput countBytesOutput = new DataOutputStream(countBytes);
- private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
- private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
- private ClosedRecordConstructorEval recordEval;
-
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
-
- @Override
- public void init(DataOutput state) throws AlgebricksException {
- try {
- state.writeDouble(0.0);
- state.writeLong(0);
- state.writeBoolean(false);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
- throws AlgebricksException {
- double globalSum = BufferSerDeUtil.getDouble(state, start);
- long globalCount = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
- inputVal.reset();
- eval.evaluate(tuple);
- byte[] serBytes = inputVal.getByteArray();
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
- switch (typeTag) {
- case NULL: {
- metNull = true;
- break;
- }
- case SYSTEM_NULL: {
- // Ignore and return.
- return;
- }
- case RECORD: {
- // Expected.
- break;
- }
- default: {
- throw new AlgebricksException("Global-Avg is not defined for values of type "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
- }
- }
- int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, 1, true);
- if (offset1 == 0) // the sum is null
- metNull = true;
- else
- globalSum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
- int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, 1, true);
- if (offset2 != 0) // the count is not null
- globalCount += AInt64SerializerDeserializer.getLong(serBytes, offset2);
-
- BufferSerDeUtil.writeDouble(globalSum, state, start);
- BufferSerDeUtil.writeLong(globalCount, state, start + 8);
- BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
-
- }
-
- @Override
- public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
- double globalSum = BufferSerDeUtil.getDouble(state, start);
- long globalCount = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
- try {
- if (globalCount == 0 || metNull)
- nullSerde.serialize(ANull.NULL, result);
- else {
- aDouble.setValue(globalSum / globalCount);
- doubleSerde.serialize(aDouble, result);
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void finishPartial(byte[] state, int start, int len, DataOutput result)
- throws AlgebricksException {
- double globalSum = BufferSerDeUtil.getDouble(state, start);
- long globalCount = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
- if (recordEval == null)
- recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
- evalCount }, avgBytes, result);
-
- try {
- if (globalCount == 0 || metNull) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(globalSum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt64.setValue(globalCount);
- longSerde.serialize(aInt64, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
- };
+ return new SerializableGlobalSqlAvgAggregateFunction(args, false);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
index 219204b..9344f45 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
@@ -15,48 +15,15 @@
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableLocalAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
@@ -74,163 +41,12 @@
@Override
public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- ICopyEvaluatorFactory[] args) throws AlgebricksException {
- final ICopyEvaluatorFactory[] evals = args;
- List<IAType> unionList = new ArrayList<IAType>();
- unionList.add(BuiltinType.ANULL);
- unionList.add(BuiltinType.ADOUBLE);
- ARecordType tmpRecType;
- try {
- tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
- new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
- } catch (AsterixException e) {
- throw new AlgebricksException(e);
- }
-
- final ARecordType recType = tmpRecType;
-
+ final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopySerializableAggregateFunctionFactory() {
private static final long serialVersionUID = 1L;
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new ICopySerializableAggregateFunction() {
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
- private ClosedRecordConstructorEval recordEval;
-
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
-
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
-
- private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
- private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
- private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput countBytesOutput = new DataOutputStream(countBytes);
- private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
- private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
-
- @Override
- public void init(DataOutput state) throws AlgebricksException {
- try {
- state.writeDouble(0.0);
- state.writeLong(0);
- state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
- throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- double sum = BufferSerDeUtil.getDouble(state, start);
- long count = BufferSerDeUtil.getLong(state, start + 8);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
- .deserialize(inputVal.getByteArray()[0]);
- ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
- if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
- aggType = ATypeTag.NULL;
- return;
- } else if (aggType == ATypeTag.SYSTEM_NULL) {
- aggType = typeTag;
- } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
- throw new AlgebricksException("Unexpected type " + typeTag
- + " in aggregation input stream. Expected type " + aggType + ".");
- }
- ++count;
- switch (typeTag) {
- case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
- }
- }
- BufferSerDeUtil.writeDouble(sum, state, start);
- BufferSerDeUtil.writeLong(count, state, start + 8);
- state[start + 16] = aggType.serialize();
- }
-
- @Override
- public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
- double sum = BufferSerDeUtil.getDouble(state, start);
- long count = BufferSerDeUtil.getLong(state, start + 8);
- ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
- if (recordEval == null) {
- recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
- evalCount }, avgBytes, result);
- }
- try {
- if (count == 0) {
- result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- return;
- }
- if (aggType == ATypeTag.NULL) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(sum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt64.setValue(count);
- int64Serde.serialize(aInt64, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void finishPartial(byte[] state, int start, int len, DataOutput partialResult)
- throws AlgebricksException {
- finish(state, start, len, partialResult);
- }
-
- };
+ return new SerializableLocalAvgAggregateFunction(args, true);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
index 6c8b6e0..f1a9ad8 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
@@ -15,48 +15,15 @@
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableLocalSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
@@ -74,163 +41,12 @@
@Override
public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- ICopyEvaluatorFactory[] args) throws AlgebricksException {
- final ICopyEvaluatorFactory[] evals = args;
- List<IAType> unionList = new ArrayList<IAType>();
- unionList.add(BuiltinType.ANULL);
- unionList.add(BuiltinType.ADOUBLE);
- ARecordType tmpRecType;
- try {
- tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
- new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
- } catch (AsterixException e) {
- throw new AlgebricksException(e);
- }
-
- final ARecordType recType = tmpRecType;
-
+ final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopySerializableAggregateFunctionFactory() {
private static final long serialVersionUID = 1L;
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new ICopySerializableAggregateFunction() {
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
- private ClosedRecordConstructorEval recordEval;
-
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
-
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
-
- private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
- private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
- private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput countBytesOutput = new DataOutputStream(countBytes);
- private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
- private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
-
- @Override
- public void init(DataOutput state) throws AlgebricksException {
- try {
- state.writeDouble(0.0);
- state.writeLong(0);
- state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
- throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- double sum = BufferSerDeUtil.getDouble(state, start);
- long count = BufferSerDeUtil.getLong(state, start + 8);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
- .deserialize(inputVal.getByteArray()[0]);
- ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
- if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
- aggType = ATypeTag.NULL;
- return;
- } else if (aggType == ATypeTag.SYSTEM_NULL) {
- aggType = typeTag;
- } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
- throw new AlgebricksException("Unexpected type " + typeTag
- + " in aggregation input stream. Expected type " + aggType + ".");
- }
- ++count;
- switch (typeTag) {
- case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
- }
- }
- BufferSerDeUtil.writeDouble(sum, state, start);
- BufferSerDeUtil.writeLong(count, state, start + 8);
- state[start + 16] = aggType.serialize();
- }
-
- @Override
- public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
- double sum = BufferSerDeUtil.getDouble(state, start);
- long count = BufferSerDeUtil.getLong(state, start + 8);
- ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
- if (recordEval == null) {
- recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
- evalCount }, avgBytes, result);
- }
- try {
- if (count == 0) {
- result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- return;
- }
- if (aggType == ATypeTag.NULL) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(sum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt64.setValue(count);
- int64Serde.serialize(aInt64, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void finishPartial(byte[] state, int start, int len, DataOutput partialResult)
- throws AlgebricksException {
- finish(state, start, len, partialResult);
- }
-
- };
+ return new SerializableLocalSqlAvgAggregateFunction(args, true);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
index dc91711..2ce85c6 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
@@ -27,7 +27,6 @@
public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
- private final static FunctionIdentifier FID = AsterixBuiltinFunctions.SERIAL_LOCAL_SQL_SUM;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
public IFunctionDescriptor createFunctionDescriptor() {
return new SerializableLocalSqlSumAggregateDescriptor();
@@ -36,7 +35,7 @@
@Override
public FunctionIdentifier getIdentifier() {
- return FID;
+ return AsterixBuiltinFunctions.SERIAL_LOCAL_SQL_SUM;
}
@Override
@@ -47,7 +46,7 @@
@Override
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new SerializableSumAggregateFunction(args, true);
+ return new SerializableSqlSumAggregateFunction(args, true);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
index 3f3aaff..bd6ead8 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
@@ -27,7 +27,6 @@
public class SerializableLocalSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
- private final static FunctionIdentifier FID = AsterixBuiltinFunctions.SERIAL_LOCAL_SUM;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
public IFunctionDescriptor createFunctionDescriptor() {
return new SerializableLocalSumAggregateDescriptor();
@@ -36,7 +35,7 @@
@Override
public FunctionIdentifier getIdentifier() {
- return FID;
+ return AsterixBuiltinFunctions.SERIAL_LOCAL_SUM;
}
@Override
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
index 0ed0f13..786510b 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
@@ -14,37 +14,15 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
@@ -62,128 +40,12 @@
@Override
public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- ICopyEvaluatorFactory[] args) throws AlgebricksException {
- final ICopyEvaluatorFactory[] evals = args;
-
+ final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopySerializableAggregateFunctionFactory() {
private static final long serialVersionUID = 1L;
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new ICopySerializableAggregateFunction() {
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
-
- private AMutableDouble aDouble = new AMutableDouble(0);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
-
- @Override
- public void init(DataOutput state) throws AlgebricksException {
- try {
- state.writeDouble(0.0);
- state.writeLong(0);
- state.writeBoolean(false);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
- throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- double sum = BufferSerDeUtil.getDouble(state, start);
- long count = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
- if (inputVal.getLength() > 0) {
- ++count;
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
- .getByteArray()[0]);
- switch (typeTag) {
- case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- metNull = true;
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute AVG for values of type "
- + typeTag);
- }
- }
- inputVal.reset();
- }
- BufferSerDeUtil.writeDouble(sum, state, start);
- BufferSerDeUtil.writeLong(count, state, start + 8);
- BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
- }
-
- @Override
- public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
- double sum = BufferSerDeUtil.getDouble(state, start);
- long count = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
- if (count == 0) {
- GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
- } else {
- try {
- if (metNull)
- nullSerde.serialize(ANull.NULL, result);
- else {
- aDouble.setValue(sum / count);
- doubleSerde.serialize(aDouble, result);
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
- }
-
- @Override
- public void finishPartial(byte[] data, int start, int len, DataOutput partialResult)
- throws AlgebricksException {
- try {
- partialResult.write(data, start, len);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- };
+ return new SerializableSqlAvgAggregateFunction(args);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
index 0e6be6c..5bfd6f2 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
@@ -14,29 +14,15 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
/**
* count(NULL) returns NULL.
@@ -63,68 +49,7 @@
@Override
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
- return new ICopySerializableAggregateFunction() {
- private AMutableInt64 result = new AMutableInt64(-1);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-
- @Override
- public void init(DataOutput state) throws AlgebricksException {
- try {
- state.writeBoolean(false);
- state.writeLong(0);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
- throws AlgebricksException {
- boolean metNull = BufferSerDeUtil.getBoolean(state, start);
- long cnt = BufferSerDeUtil.getLong(state, start + 1);
- inputVal.reset();
- eval.evaluate(tuple);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
- .deserialize(inputVal.getByteArray()[0]);
- if (typeTag == ATypeTag.NULL) {
- metNull = true;
- } else {
- cnt++;
- }
- BufferSerDeUtil.writeBoolean(metNull, state, start);
- BufferSerDeUtil.writeLong(cnt, state, start + 1);
- }
-
- @Override
- public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
- boolean metNull = BufferSerDeUtil.getBoolean(state, start);
- long cnt = BufferSerDeUtil.getLong(state, start + 1);
- try {
- if (metNull) {
- nullSerde.serialize(ANull.NULL, out);
- } else {
- result.setValue(cnt);
- int64Serde.serialize(result, out);
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void finishPartial(byte[] state, int start, int len, DataOutput out)
- throws AlgebricksException {
- finish(state, start, len, out);
- }
- };
+ return new SerializableSqlCountAggregateFunction(args);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
index 3a08642..c68b3b3 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
@@ -46,7 +46,7 @@
@Override
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new SerializableSumAggregateFunction(args, false);
+ return new SerializableSqlSumAggregateFunction(args, false);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
index bea6ab8..6f20e0b 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -14,199 +14,14 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
-import edu.uci.ics.asterix.om.base.ANull;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public class SerializableSumAggregateFunction implements ICopySerializableAggregateFunction {
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval;
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableFloat aFloat = new AMutableFloat(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
- private AMutableInt32 aInt32 = new AMutableInt32(0);
- private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
- private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
- @SuppressWarnings("rawtypes")
- private ISerializerDeserializer serde;
- private final boolean isLocalAgg;
+public class SerializableSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
public SerializableSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
throws AlgebricksException {
- eval = args[0].createEvaluator(inputVal);
- this.isLocalAgg = isLocalAgg;
+ super(args, isLocalAgg);
}
- @Override
- public void init(DataOutput state) throws AlgebricksException {
- try {
- state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- state.writeDouble(0.0);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
- ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
- double sum = BufferSerDeUtil.getDouble(state, start + 1);
- inputVal.reset();
- eval.evaluate(tuple);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
- if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
- aggType = ATypeTag.NULL;
- return;
- } else if (aggType == ATypeTag.SYSTEM_NULL) {
- aggType = typeTag;
- } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
- throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
- + aggType + ".");
- }
- switch (typeTag) {
- case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- aggType = typeTag;
- break;
- }
- case SYSTEM_NULL: {
- // For global aggregates simply ignore system null here,
- // but if all input value are system null, then we should return
- // null in finish().
- if (isLocalAgg) {
- throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
- }
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
- }
- }
- state[start] = aggType.serialize();
- BufferSerDeUtil.writeDouble(sum, state, start + 1);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
- ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
- double sum = BufferSerDeUtil.getDouble(state, start + 1);
- try {
- switch (aggType) {
- case INT8: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
- aInt8.setValue((byte) sum);
- serde.serialize(aInt8, out);
- break;
- }
- case INT16: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
- aInt16.setValue((short) sum);
- serde.serialize(aInt16, out);
- break;
- }
- case INT32: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
- aInt32.setValue((int) sum);
- serde.serialize(aInt32, out);
- break;
- }
- case INT64: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
- aInt64.setValue((long) sum);
- serde.serialize(aInt64, out);
- break;
- }
- case FLOAT: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
- aFloat.setValue((float) sum);
- serde.serialize(aFloat, out);
- break;
- }
- case DOUBLE: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
- aDouble.setValue(sum);
- serde.serialize(aDouble, out);
- break;
- }
- case NULL: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- break;
- }
- case SYSTEM_NULL: {
- // Empty stream. For local agg return system null. For global agg return null.
- if (isLocalAgg) {
- out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- } else {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- }
- break;
- }
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
-
- }
-
- @Override
- public void finishPartial(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
- finish(state, start, len, out);
- }
}