Restructured the aggregate functions to simplify average and make all abstract classes more consistent.
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java
index 8e396ed..4d804ee 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java
@@ -15,22 +15,32 @@
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AInt64;
import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
+import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
@@ -38,18 +48,38 @@
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public abstract class AbstractSerializableAvgAggregateFunction implements ICopySerializableAggregateFunction {
+ private static final int SUM_FIELD_ID = 0;
+ private static final int COUNT_FIELD_ID = 1;
+
+ private static final int SUM_OFFSET = 0;
+ private static final int COUNT_OFFSET = 8;
+ protected static final int AGG_TYPE_OFFSET = 16;
private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
private ICopyEvaluator eval;
-
private AMutableDouble aDouble = new AMutableDouble(0);
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+
+ private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
+ private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
+ private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
+ private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
+ private DataOutput countBytesOutput = new DataOutputStream(countBytes);
+ private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
+ private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
+ private ClosedRecordConstructorEval recordEval;
+
@SuppressWarnings("unchecked")
private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ADOUBLE);
@SuppressWarnings("unchecked")
+ private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+ @SuppressWarnings("unchecked")
private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ANULL);
@@ -62,102 +92,185 @@
try {
state.writeDouble(0.0);
state.writeLong(0);
- state.writeBoolean(false);
+ state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
} catch (IOException e) {
throw new AlgebricksException(e);
}
}
- @Override
- public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ public abstract void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException;
+
+ public abstract void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException;
+
+ public abstract void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException;
+
+ protected abstract void processNull(byte[] state, int start);
+
+ protected void processDataValues(IFrameTupleReference tuple, byte[] state, int start, int len)
+ throws AlgebricksException {
+ if (skipStep(state, start)) {
+ return;
+ }
inputVal.reset();
eval.evaluate(tuple);
- double sum = BufferSerDeUtil.getDouble(state, start);
- long count = BufferSerDeUtil.getLong(state, start + 8);
- if (inputVal.getLength() > 0) {
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
- if (typeTag != ATypeTag.NULL) {
- ++count;
- }
- switch (typeTag) {
- case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- processNull(state, start);
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
- }
- }
- inputVal.reset();
+ double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+ long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ if (typeTag == ATypeTag.NULL) {
+ processNull(state, start);
+ return;
+ } else if (aggType == ATypeTag.SYSTEM_NULL) {
+ aggType = typeTag;
+ } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+ throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+ + aggType + ".");
+ } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+ aggType = typeTag;
}
- BufferSerDeUtil.writeDouble(sum, state, start);
- BufferSerDeUtil.writeLong(count, state, start + 8);
+ ++count;
+ switch (typeTag) {
+ case INT8: {
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT16: {
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+ }
+ }
+ inputVal.reset();
+ BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET);
+ BufferSerDeUtil.writeLong(count, state, start + COUNT_OFFSET);
+ state[start + AGG_TYPE_OFFSET] = aggType.serialize();
}
- @Override
- public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
- double sum = BufferSerDeUtil.getDouble(state, start);
- long count = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
- if (count == 0) {
- GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
- } else {
+ protected void finishPartialResults(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+ long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ if (recordEval == null) {
+ ARecordType recType;
try {
- if (metNull)
- nullSerde.serialize(ANull.NULL, result);
- else {
- aDouble.setValue(sum / count);
- doubleSerde.serialize(aDouble, result);
- }
- } catch (IOException e) {
+ recType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] { BuiltinType.ADOUBLE,
+ BuiltinType.AINT64 }, false);
+ } catch (AsterixException e) {
throw new AlgebricksException(e);
}
+ recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum, evalCount },
+ avgBytes, result);
}
- }
- @Override
- public void finishPartial(byte[] data, int start, int len, DataOutput partialResult) throws AlgebricksException {
try {
- partialResult.write(data, start, len);
+ if (aggType == ATypeTag.SYSTEM_NULL) {
+ if (GlobalConfig.DEBUG) {
+ GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+ }
+ result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else if (aggType == ATypeTag.NULL) {
+ result.writeByte(ATypeTag.NULL.serialize());
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(sum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
+ countBytes.reset();
+ aInt64.setValue(count);
+ longSerde.serialize(aInt64, countBytesOutput);
+ recordEval.evaluate(null);
+ }
} catch (IOException e) {
throw new AlgebricksException(e);
}
}
- protected void processNull(byte[] state, int start) {
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
- metNull = true;
- BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
+ protected void processPartialResults(IFrameTupleReference tuple, byte[] state, int start, int len)
+ throws AlgebricksException {
+ if (skipStep(state, start)) {
+ return;
+ }
+ double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+ long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+
+ inputVal.reset();
+ eval.evaluate(tuple);
+ byte[] serBytes = inputVal.getByteArray();
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+ switch (typeTag) {
+ case NULL: {
+ processNull(state, start);
+ break;
+ }
+ case SYSTEM_NULL: {
+ // Ignore and return.
+ break;
+ }
+ case RECORD: {
+ // Expected.
+ int nullBitmapSize = 0;
+ int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, SUM_FIELD_ID, nullBitmapSize,
+ false);
+ sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
+ int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, COUNT_FIELD_ID,
+ nullBitmapSize, false);
+ count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
+
+ BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET);
+ BufferSerDeUtil.writeLong(count, state, start + COUNT_OFFSET);
+ state[start + AGG_TYPE_OFFSET] = aggType.serialize();
+ break;
+ }
+ default: {
+ throw new AlgebricksException("Global-Avg is not defined for values of type "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+ }
+ }
}
+
+ protected void finishFinalResults(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+ long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+
+ try {
+ if (count == 0 || aggType == ATypeTag.NULL)
+ nullSerde.serialize(ANull.NULL, result);
+ else {
+ aDouble.setValue(sum / count);
+ doubleSerde.serialize(aDouble, result);
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ protected boolean skipStep(byte[] state, int start) {
+ return false;
+ }
+
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java
index 9388671..71f709f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java
@@ -36,6 +36,9 @@
* count(NULL) returns NULL.
*/
public abstract class AbstractSerializableCountAggregateFunction implements ICopySerializableAggregateFunction {
+ private static final int MET_NULL_OFFSET = 0;
+ private static final int COUNT_OFFSET = 1;
+
private AMutableInt64 result = new AMutableInt64(-1);
@SuppressWarnings("unchecked")
private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
@@ -72,8 +75,8 @@
} else {
cnt++;
}
- BufferSerDeUtil.writeBoolean(metNull, state, start);
- BufferSerDeUtil.writeLong(cnt, state, start + 1);
+ BufferSerDeUtil.writeBoolean(metNull, state, start + MET_NULL_OFFSET);
+ BufferSerDeUtil.writeLong(cnt, state, start + COUNT_OFFSET);
}
@Override
@@ -98,8 +101,6 @@
}
protected void processNull(byte[] state, int start) {
- boolean metNull = BufferSerDeUtil.getBoolean(state, start);
- metNull = true;
- BufferSerDeUtil.writeBoolean(metNull, state, start);
+ BufferSerDeUtil.writeBoolean(true, state, start + MET_NULL_OFFSET);
}
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
index 5ef6f23..68c5b43 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
@@ -34,6 +34,7 @@
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
@@ -43,7 +44,10 @@
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public class AbstractSerializableSumAggregateFunction implements ICopySerializableAggregateFunction {
+public abstract class AbstractSerializableSumAggregateFunction implements ICopySerializableAggregateFunction {
+ protected static final int AGG_TYPE_OFFSET = 0;
+ private static final int SUM_OFFSET = 1;
+
private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
private ICopyEvaluator eval;
private AMutableDouble aDouble = new AMutableDouble(0);
@@ -53,13 +57,11 @@
private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
@SuppressWarnings("rawtypes")
- private ISerializerDeserializer serde;
- private final boolean isLocalAgg;
+ public ISerializerDeserializer serde;
- public AbstractSerializableSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
+ public AbstractSerializableSumAggregateFunction(ICopyEvaluatorFactory[] args)
throws AlgebricksException {
eval = args[0].createEvaluator(inputVal);
- this.isLocalAgg = isLocalAgg;
}
@Override
@@ -74,22 +76,28 @@
@Override
public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
- ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
- double sum = BufferSerDeUtil.getDouble(state, start + 1);
+ if (skipStep(state, start)) {
+ return;
+ }
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
inputVal.reset();
eval.evaluate(tuple);
ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
if (typeTag == ATypeTag.NULL) {
processNull(state, start);
return;
- } else if (aggType == ATypeTag.NULL) {
- return;
} else if (aggType == ATypeTag.SYSTEM_NULL) {
aggType = typeTag;
- } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
- throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
- + aggType + ".");
+ } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+ throw new AlgebricksException("Unexpected type " + typeTag
+ + " in aggregation input stream. Expected type (or a promotable type to)" + aggType + ".");
}
+
+ if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+ aggType = typeTag;
+ }
+
switch (typeTag) {
case INT8: {
byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
@@ -126,27 +134,22 @@
break;
}
case SYSTEM_NULL: {
- // For global aggregates simply ignore system null here,
- // but if all input value are system null, then we should return
- // null in finish().
- if (isLocalAgg) {
- throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
- }
+ processSystemNull();
break;
}
default: {
throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
}
}
- state[start] = aggType.serialize();
- BufferSerDeUtil.writeDouble(sum, state, start + 1);
+ state[start + AGG_TYPE_OFFSET] = aggType.serialize();
+ BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET);
}
@SuppressWarnings("unchecked")
@Override
public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
- ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
- double sum = BufferSerDeUtil.getDouble(state, start + 1);
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
try {
switch (aggType) {
case INT8: {
@@ -191,20 +194,16 @@
break;
}
case SYSTEM_NULL: {
- // Empty stream. For local agg return system null. For global agg return null.
- if (isLocalAgg) {
- out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- } else {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- }
+ finishSystemNull(out);
break;
}
+ default:
+ throw new AlgebricksException("SumAggregationFunction: incompatible type for the result ("
+ + aggType + "). ");
}
} catch (IOException e) {
throw new AlgebricksException(e);
}
-
}
@Override
@@ -212,9 +211,11 @@
finish(state, start, len, out);
}
- protected void processNull(byte[] state, int start) {
- ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
- aggType = ATypeTag.NULL;
- state[start] = aggType.serialize();
+ protected boolean skipStep(byte[] state, int start) {
+ return false;
}
+ protected abstract void processNull(byte[] state, int start);
+ protected abstract void processSystemNull() throws AlgebricksException;
+ protected abstract void finishSystemNull(DataOutput out) throws IOException;
+
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java
index 4f66c54..202933d 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java
@@ -14,8 +14,13 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
@@ -23,4 +28,29 @@
super(args);
}
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ processDataValues(tuple, state, start, len);
+ }
+
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finishFinalResults(state, start, len, result);
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finish(state, start, len, result);
+ }
+
+ protected void processNull(byte[] state, int start) {
+ state[start + AGG_TYPE_OFFSET] = ATypeTag.NULL.serialize();
+ }
+
+ @Override
+ protected boolean skipStep(byte[] state, int start) {
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ return (aggType == ATypeTag.NULL);
+ }
+
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
index 182214a..ccfeb43 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
@@ -47,7 +47,7 @@
@Override
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new SerializableGlobalAvgAggregateFunction(args, false);
+ return new SerializableGlobalAvgAggregateFunction(args);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java
index 1a69283..c6d4b5a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java
@@ -15,13 +15,43 @@
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public class SerializableGlobalAvgAggregateFunction extends AbstractSerializableGlobalAvgAggregateFunction {
+public class SerializableGlobalAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
- public SerializableGlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg) throws AlgebricksException {
- super(args, isLocalAgg);
+ public SerializableGlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ super(args);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ processPartialResults(tuple, state, start, len);
+ }
+
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finishFinalResults(state, start, len, result);
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finishPartialResults(state, start, len, result);
+ }
+
+ protected void processNull(byte[] state, int start) {
+ state[start + AGG_TYPE_OFFSET] = ATypeTag.NULL.serialize();
+ }
+
+ @Override
+ protected boolean skipStep(byte[] state, int start) {
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ return (aggType == ATypeTag.NULL);
}
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
index 5314ebe..7fa6b9c 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
@@ -47,7 +47,7 @@
@Override
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new SerializableGlobalSqlAvgAggregateFunction(args, false);
+ return new SerializableGlobalSqlAvgAggregateFunction(args);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
index 0821ea0..cc5043f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
@@ -15,17 +15,35 @@
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+import java.io.DataOutput;
+
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public class SerializableGlobalSqlAvgAggregateFunction extends AbstractSerializableGlobalAvgAggregateFunction {
+public class SerializableGlobalSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
- public SerializableGlobalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg) throws AlgebricksException {
- super(args, isLocalAgg);
+ public SerializableGlobalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ super(args);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ processPartialResults(tuple, state, start, len);
+ }
+
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finishFinalResults(state, start, len, result);
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finishPartialResults(state, start, len, result);
}
@Override
protected void processNull(byte[] state, int start) {
}
-
+
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
index 9344f45..c015682 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
@@ -46,7 +46,7 @@
private static final long serialVersionUID = 1L;
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new SerializableLocalAvgAggregateFunction(args, true);
+ return new SerializableLocalAvgAggregateFunction(args);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
index aed5de2..00835d7 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
@@ -15,17 +15,43 @@
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public class SerializableLocalAvgAggregateFunction extends AbstractSerializableGlobalAvgAggregateFunction {
+public class SerializableLocalAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
- public SerializableLocalAvgAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg) throws AlgebricksException {
- super(args, isLocalAgg);
+ public SerializableLocalAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ super(args);
}
@Override
- protected void processNull(byte[] state, int start) {
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ processPartialResults(tuple, state, start, len);
}
-
+
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finishPartialResults(state, start, len, result);
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finish(state, start, len, result);
+ }
+
+ protected void processNull(byte[] state, int start) {
+ state[start + AGG_TYPE_OFFSET] = ATypeTag.NULL.serialize();
+ }
+
+ @Override
+ protected boolean skipStep(byte[] state, int start) {
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ return (aggType == ATypeTag.NULL);
+ }
+
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
index f1a9ad8..58415d3 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
@@ -46,7 +46,7 @@
private static final long serialVersionUID = 1L;
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new SerializableLocalSqlAvgAggregateFunction(args, true);
+ return new SerializableLocalSqlAvgAggregateFunction(args);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
index 94c2dd4..0caeadf 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
@@ -15,17 +15,35 @@
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+import java.io.DataOutput;
+
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public class SerializableLocalSqlAvgAggregateFunction extends AbstractSerializableGlobalAvgAggregateFunction {
+public class SerializableLocalSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
- public SerializableLocalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg) throws AlgebricksException {
- super(args, isLocalAgg);
+ public SerializableLocalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ super(args);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ processPartialResults(tuple, state, start, len);
+ }
+
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finishPartialResults(state, start, len, result);
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finish(state, start, len, result);
}
@Override
protected void processNull(byte[] state, int start) {
}
-
+
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
index b922308..c1f77a3 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
@@ -14,8 +14,11 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+import java.io.DataOutput;
+
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
@@ -24,7 +27,22 @@
}
@Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ processDataValues(tuple, state, start, len);
+ }
+
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finishFinalResults(state, start, len, result);
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finish(state, start, len, result);
+ }
+
+ @Override
protected void processNull(byte[] state, int start) {
}
-
+
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
index 9d6e46e..f80ade3 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
@@ -14,18 +14,49 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
public class SerializableSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+ private final boolean isLocalAgg;
public SerializableSqlSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
throws AlgebricksException {
- super(args, isLocalAgg);
+ super(args);
+ this.isLocalAgg = isLocalAgg;
}
@Override
protected void processNull(byte[] state, int start) {
}
-
+
+ @Override
+ protected void processSystemNull() throws AlgebricksException {
+ // For global aggregates simply ignore system null here,
+ // but if all input value are system null, then we should return
+ // null in finish().
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void finishSystemNull(DataOutput out) throws IOException {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ }
+ }
+
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
index 6f20e0b..15e546a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -14,14 +14,57 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
public class SerializableSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+ private final boolean isLocalAgg;
public SerializableSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
throws AlgebricksException {
- super(args, isLocalAgg);
+ super(args);
+ this.isLocalAgg = isLocalAgg;
+ }
+
+ protected void processNull(byte[] state, int start) {
+ ATypeTag aggType = ATypeTag.NULL;
+ state[start + AGG_TYPE_OFFSET] = aggType.serialize();
+ }
+
+ @Override
+ protected boolean skipStep(byte[] state, int start) {
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ return (aggType == ATypeTag.NULL);
+ }
+
+ @Override
+ protected void processSystemNull() throws AlgebricksException {
+ // For global aggregates simply ignore system null here,
+ // but if all input value are system null, then we should return
+ // null in finish().
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void finishSystemNull(DataOutput out) throws IOException {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ }
}
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
index d322669..cfc5f1c 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
@@ -17,8 +17,6 @@
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -37,7 +35,6 @@
import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.asterix.om.types.IAType;
@@ -56,17 +53,19 @@
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunction {
+ private static final int SUM_FIELD_ID = 0;
+ private static final int COUNT_FIELD_ID = 1;
+
private final ARecordType recType;
private DataOutput out;
private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
private ICopyEvaluator eval;
- private ATypeTag aggType;
+ protected ATypeTag aggType;
private double sum;
private long count;
private AMutableDouble aDouble = new AMutableDouble(0);
private AMutableInt64 aInt64 = new AMutableInt64(0);
- private final boolean isLocalAgg;
private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
@@ -81,30 +80,21 @@
private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ADOUBLE);
@SuppressWarnings("unchecked")
- private ISerializerDeserializer<AInt64> intSerde = AqlSerializerDeserializerProvider.INSTANCE
+ private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.AINT64);
@SuppressWarnings("unchecked")
private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ANULL);
- public AbstractAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output, boolean isLocalAgg)
+ public AbstractAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
throws AlgebricksException {
eval = args[0].createEvaluator(inputVal);
out = output.getDataOutput();
- this.isLocalAgg = isLocalAgg;
- List<IAType> unionList = new ArrayList<IAType>();
- unionList.add(BuiltinType.ANULL);
- unionList.add(BuiltinType.ADOUBLE);
ARecordType tmpRecType;
try {
- if (isLocalAgg) {
- tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
- new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, false);
- } else {
- tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
- new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
- }
+ tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] { BuiltinType.ADOUBLE,
+ BuiltinType.AINT64 }, false);
} catch (AsterixException e) {
throw new AlgebricksException(e);
}
@@ -121,25 +111,24 @@
count = 0;
}
- @Override
- public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ public abstract void step(IFrameTupleReference tuple) throws AlgebricksException;
+
+ public abstract void finish() throws AlgebricksException;
+
+ public abstract void finishPartial() throws AlgebricksException;
+
+ protected abstract void processNull();
+
+ protected void processDataValues(IFrameTupleReference tuple) throws AlgebricksException {
+ if (skipStep()) {
+ return;
+ }
inputVal.reset();
eval.evaluate(tuple);
- byte[] serBytes = inputVal.getByteArray();
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
if (typeTag == ATypeTag.NULL) {
processNull();
return;
- } else if (aggType == ATypeTag.NULL) {
- return;
- } else if (typeTag == ATypeTag.RECORD) {
- // Global aggregate
- if (isLocalAgg) {
- throw new AlgebricksException("Record type can not be processed by in a local-avg operation.");
- } else if (typeTag == ATypeTag.SYSTEM_NULL) {
- // ignore
- return;
- }
} else if (aggType == ATypeTag.SYSTEM_NULL) {
aggType = typeTag;
} else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
@@ -148,11 +137,7 @@
} else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
aggType = typeTag;
}
-
- if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != ATypeTag.RECORD) {
- ++count;
- }
-
+ ++count;
switch (typeTag) {
case INT8: {
byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
@@ -184,33 +169,6 @@
sum += val;
break;
}
- case NULL: {
- break;
- }
- case SYSTEM_NULL: {
- if (isLocalAgg) {
- throw new AlgebricksException("SYSTEM_NULL can not be processed by in a local-avg operation.");
- }
- break;
- }
- case RECORD: {
- // Expected for global aggregate.
- // The record length helps us determine whether the input record fields are nullable.
- int recordLength = ARecordSerializerDeserializer.getRecordLength(serBytes, 1);
- int nullBitmapSize = 1;
- if (recordLength == 29) {
- nullBitmapSize = 0;
- }
- int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, nullBitmapSize, false);
- if (offset1 == 0) // the sum is null
- aggType = ATypeTag.NULL;
- else
- sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
- int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, nullBitmapSize, false);
- if (offset2 != 0) // the count is not null
- count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
- break;
- }
default: {
throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
}
@@ -218,70 +176,80 @@
inputVal.reset();
}
- @Override
- public void finish() throws AlgebricksException {
+ protected void finishPartialResults() throws AlgebricksException {
try {
- if (isLocalAgg) {
- if (count == 0 && aggType != ATypeTag.NULL) {
- out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- return;
+ // Double check that count 0 is accounted
+ if (aggType == ATypeTag.SYSTEM_NULL) {
+ if (GlobalConfig.DEBUG) {
+ GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
}
- if (aggType == ATypeTag.NULL) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(sum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else if (aggType == ATypeTag.NULL) {
+ out.writeByte(ATypeTag.NULL.serialize());
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(sum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
countBytes.reset();
aInt64.setValue(count);
- intSerde.serialize(aInt64, countBytesOutput);
+ longSerde.serialize(aInt64, countBytesOutput);
recordEval.evaluate(null);
- } else {
- if (count == 0 || aggType == ATypeTag.NULL) {
- nullSerde.serialize(ANull.NULL, out);
- } else {
- aDouble.setValue(sum / count);
- doubleSerde.serialize(aDouble, out);
- }
}
} catch (IOException e) {
throw new AlgebricksException(e);
}
}
- @Override
- public void finishPartial() throws AlgebricksException {
- if (isLocalAgg) {
- finish();
- } else {
- if (count == 0) {
- if (GlobalConfig.DEBUG) {
- GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
- }
- } else {
- try {
- if (count == 0 || aggType == ATypeTag.NULL) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(sum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt64.setValue(count);
- intSerde.serialize(aInt64, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
+ protected void processPartialResults(IFrameTupleReference tuple) throws AlgebricksException {
+ if (skipStep()) {
+ return;
+ }
+ inputVal.reset();
+ eval.evaluate(tuple);
+ byte[] serBytes = inputVal.getByteArray();
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+ switch (typeTag) {
+ case NULL: {
+ processNull();
+ break;
+ }
+ case SYSTEM_NULL: {
+ // Ignore and return.
+ break;
+ }
+ case RECORD: {
+ // Expected.
+ int nullBitmapSize = 0;
+ int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, SUM_FIELD_ID, nullBitmapSize,
+ false);
+ sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
+ int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, COUNT_FIELD_ID,
+ nullBitmapSize, false);
+ count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
+ break;
+ }
+ default: {
+ throw new AlgebricksException("Global-Avg is not defined for values of type "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
}
}
}
-
- protected void processNull() {
- aggType = ATypeTag.NULL;
+
+ protected void finishFinalResults() throws AlgebricksException {
+ try {
+ if (count == 0 || aggType == ATypeTag.NULL) {
+ nullSerde.serialize(ANull.NULL, out);
+ } else {
+ aDouble.setValue(sum / count);
+ doubleSerde.serialize(aDouble, out);
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
}
+
+ protected boolean skipStep() {
+ return false;
+ }
+
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
index bae42a9..29e57f5 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
@@ -42,7 +42,7 @@
.getSerializerDeserializer(BuiltinType.AINT64);
private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
private ICopyEvaluator eval;
- private long cnt;
+ protected long cnt;
private DataOutput out;
public AbstractCountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
@@ -84,7 +84,5 @@
finish();
}
- protected void processNull() {
- cnt++;
- }
+ protected abstract void processNull();
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
index 09c0171..29aba33 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
@@ -36,20 +36,18 @@
private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
private ArrayBackedValueStorage tempValForCasting = new ArrayBackedValueStorage();
- private DataOutput out;
+ protected DataOutput out;
private ICopyEvaluator eval;
- private ATypeTag aggType;
+ protected ATypeTag aggType;
private IBinaryComparator cmp;
private ITypePromoteComputer tpc;
private final boolean isMin;
- private final boolean isLocalAgg;
- public AbstractMinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
- boolean isLocalAgg) throws AlgebricksException {
+ public AbstractMinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin)
+ throws AlgebricksException {
out = provider.getDataOutput();
eval = args[0].createEvaluator(inputVal);
this.isMin = isMin;
- this.isLocalAgg = isLocalAgg;
}
@Override
@@ -90,11 +88,8 @@
// If a system_null is encountered locally, it would be an error; otherwise if it is seen
// by a global aggregator, it is simple ignored.
if (typeTag == ATypeTag.SYSTEM_NULL) {
- if (isLocalAgg) {
- throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
- } else {
- return;
- }
+ processSystemNull();
+ return;
}
if (ATypeHierarchy.canPromote(aggType, typeTag)) {
@@ -153,12 +148,7 @@
break;
}
case SYSTEM_NULL: {
- // Empty stream. For local agg return system null. For global agg return null.
- if (isLocalAgg) {
- out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- } else {
- out.writeByte(ATypeTag.NULL.serialize());
- }
+ finishSystemNull();
break;
}
default: {
@@ -176,7 +166,13 @@
finish();
}
- protected void processNull() {
- aggType = ATypeTag.NULL;
+ protected boolean skipStep() {
+ return false;
}
+
+ protected abstract void processNull();
+
+ protected abstract void processSystemNull() throws AlgebricksException;
+
+ protected abstract void finishSystemNull() throws IOException;
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
index b953f78..1332dd5 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
@@ -46,11 +46,11 @@
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunction {
- private DataOutput out;
+ protected DataOutput out;
private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
private ICopyEvaluator eval;
private double sum;
- private ATypeTag aggType;
+ protected ATypeTag aggType;
private AMutableDouble aDouble = new AMutableDouble(0);
private AMutableFloat aFloat = new AMutableFloat(0);
private AMutableInt64 aInt64 = new AMutableInt64(0);
@@ -58,15 +58,12 @@
private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
@SuppressWarnings("rawtypes")
- private ISerializerDeserializer serde;
+ protected ISerializerDeserializer serde;
- private final boolean isLocalAgg;
-
- public AbstractSumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
+ public AbstractSumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider)
throws AlgebricksException {
out = provider.getDataOutput();
eval = args[0].createEvaluator(inputVal);
- this.isLocalAgg = isLocalAgg;
}
@Override
@@ -77,14 +74,15 @@
@Override
public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ if (skipStep()) {
+ return;
+ }
inputVal.reset();
eval.evaluate(tuple);
ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
if (typeTag == ATypeTag.NULL) {
processNull();
return;
- } else if (aggType == ATypeTag.NULL) {
- return;
} else if (aggType == ATypeTag.SYSTEM_NULL) {
aggType = typeTag;
} else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
@@ -131,12 +129,7 @@
break;
}
case SYSTEM_NULL: {
- // For global aggregates simply ignore system null here,
- // but if all input value are system null, then we should return
- // null in finish().
- if (isLocalAgg) {
- throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
- }
+ processSystemNull();
break;
}
default: {
@@ -192,13 +185,7 @@
break;
}
case SYSTEM_NULL: {
- // Empty stream. For local agg return system null. For global agg return null.
- if (isLocalAgg) {
- out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- } else {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- }
+ finishSystemNull();
break;
}
default:
@@ -215,7 +202,10 @@
finish();
}
- protected void processNull() {
- aggType = ATypeTag.NULL;
+ protected boolean skipStep() {
+ return false;
}
+ protected abstract void processNull();
+ protected abstract void processSystemNull() throws AlgebricksException;
+ protected abstract void finishSystemNull() throws IOException;
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java
index 4fc9e37..2470e5a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java
@@ -15,14 +15,41 @@
package edu.uci.ics.asterix.runtime.aggregates.std;
+import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class AvgAggregateFunction extends AbstractAvgAggregateFunction {
public AvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
- super(args, output, false);
+ super(args, output);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ processDataValues(tuple);
+ }
+
+ @Override
+ public void finish() throws AlgebricksException {
+ finishFinalResults();
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
+ }
+
+ @Override
+ protected void processNull() {
+ aggType = ATypeTag.NULL;
+ }
+
+ @Override
+ protected boolean skipStep() {
+ return (aggType == ATypeTag.NULL);
}
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
index 56afdd6..7c97fc2 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
@@ -26,4 +26,9 @@
public CountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
super(args, output);
}
+
+ protected void processNull() {
+ cnt++;
+ }
+
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java
index de5185e..a63d4bc 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java
@@ -15,14 +15,42 @@
package edu.uci.ics.asterix.runtime.aggregates.std;
+import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class GlobalAvgAggregateFunction extends AbstractAvgAggregateFunction {
public GlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
throws AlgebricksException {
- super(args, output, false);
+ super(args, output);
}
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ processPartialResults(tuple);
+ }
+
+ @Override
+ public void finish() throws AlgebricksException {
+ finishFinalResults();
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finishPartialResults();
+ }
+
+ @Override
+ protected void processNull() {
+ aggType = ATypeTag.NULL;
+ }
+
+ @Override
+ protected boolean skipStep() {
+ return (aggType == ATypeTag.NULL);
+ }
+
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java
index 4619c4e..02c3c2c 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java
@@ -18,12 +18,28 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class GlobalSqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
public GlobalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
throws AlgebricksException {
- super(args, output, false);
+ super(args, output);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ processPartialResults(tuple);
+ }
+
+ @Override
+ public void finish() throws AlgebricksException {
+ finishFinalResults();
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finishPartialResults();
}
@Override
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java
index 9f89c5a..2c7abd4 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java
@@ -15,14 +15,42 @@
package edu.uci.ics.asterix.runtime.aggregates.std;
+import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class LocalAvgAggregateFunction extends AbstractAvgAggregateFunction {
public LocalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
throws AlgebricksException {
- super(args, output, true);
+ super(args, output);
}
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ processDataValues(tuple);
+ }
+
+ @Override
+ public void finish() throws AlgebricksException {
+ finishPartialResults();
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
+ }
+
+ @Override
+ protected void processNull() {
+ aggType = ATypeTag.NULL;
+ }
+
+ @Override
+ protected boolean skipStep() {
+ return (aggType == ATypeTag.NULL);
+ }
+
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java
index 0fca8a9..6a97410 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java
@@ -18,15 +18,32 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class LocalSqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
public LocalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
throws AlgebricksException {
- super(args, output, true);
+ super(args, output);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ processDataValues(tuple);
+ }
+
+ @Override
+ public void finish() throws AlgebricksException {
+ finishPartialResults();
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
}
@Override
protected void processNull() {
}
+
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
index 7612c9c..0809701 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
@@ -14,14 +14,46 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.std;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-public class MinMaxAggregateFunction extends AbstractMinMaxAggregateFunction{
+public class MinMaxAggregateFunction extends AbstractMinMaxAggregateFunction {
+ private final boolean isLocalAgg;
public MinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
boolean isLocalAgg) throws AlgebricksException {
- super(args, provider, isMin, isLocalAgg);
+ super(args, provider, isMin);
+ this.isLocalAgg = isLocalAgg;
}
+
+ protected void processNull() {
+ aggType = ATypeTag.NULL;
+ }
+
+ @Override
+ protected boolean skipStep() {
+ return (aggType == ATypeTag.NULL);
+ }
+
+ @Override
+ protected void processSystemNull() throws AlgebricksException {
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ }
+
+ @Override
+ protected void finishSystemNull() throws IOException {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ out.writeByte(ATypeTag.NULL.serialize());
+ }
+ }
+
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java
index 08d6353..11cf13f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java
@@ -18,11 +18,27 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
public SqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
- super(args, output, false);
+ super(args, output);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ processDataValues(tuple);
+ }
+
+ @Override
+ public void finish() throws AlgebricksException {
+ finishFinalResults();
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
}
@Override
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java
index 2111ccd..56236e3 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java
@@ -14,18 +14,40 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.std;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
public class SqlMinMaxAggregateFunction extends AbstractMinMaxAggregateFunction {
+ private final boolean isLocalAgg;
public SqlMinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
boolean isLocalAgg) throws AlgebricksException {
- super(args, provider, isMin, isLocalAgg);
+ super(args, provider, isMin);
+ this.isLocalAgg = isLocalAgg;
}
@Override
protected void processNull() {
}
+
+ @Override
+ protected void processSystemNull() throws AlgebricksException {
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ }
+
+ @Override
+ protected void finishSystemNull() throws IOException {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ out.writeByte(ATypeTag.NULL.serialize());
+ }
+ }
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java
index 4bd533d..4418288 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java
@@ -14,18 +14,48 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.std;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
public class SqlSumAggregateFunction extends AbstractSumAggregateFunction {
-
+ private final boolean isLocalAgg;
+
public SqlSumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
throws AlgebricksException {
- super(args, provider, isLocalAgg);
+ super(args, provider);
+ this.isLocalAgg = isLocalAgg;
}
@Override
protected void processNull() {
}
+
+ @Override
+ protected void processSystemNull() throws AlgebricksException {
+ // For global aggregates simply ignore system null here,
+ // but if all input value are system null, then we should return
+ // null in finish().
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void finishSystemNull() throws IOException {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ }
+ }
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
index 52ec126..42b0346 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
@@ -14,14 +14,54 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.std;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
public class SumAggregateFunction extends AbstractSumAggregateFunction {
+ private final boolean isLocalAgg;
public SumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
throws AlgebricksException {
- super(args, provider, isLocalAgg);
+ super(args, provider);
+ this.isLocalAgg = isLocalAgg;
+ }
+
+ @Override
+ protected boolean skipStep() {
+ return (aggType == ATypeTag.NULL);
+ }
+
+ @Override
+ protected void processNull() {
+ aggType = ATypeTag.NULL;
+ }
+
+ @Override
+ protected void processSystemNull() throws AlgebricksException {
+ // For global aggregates simply ignore system null here,
+ // but if all input value are system null, then we should return
+ // null in finish().
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void finishSystemNull() throws IOException {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ }
}
}