Merge branch 'master' into ecarm002/aggregate_sql_functions
* master:
fix for issue 712
Fixed a typo in the merge policy parameters.
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index 299bbfb..8f0bab3 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -304,6 +304,57 @@
public final static FunctionIdentifier SERIAL_LOCAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"local-avg-serial", 1);
+ // sql aggregate functions
+ public final static FunctionIdentifier SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "agg-sql-avg", 1);
+ public final static FunctionIdentifier SQL_COUNT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "agg-sql-count", 1);
+ public final static FunctionIdentifier SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "agg-sql-sum", 1);
+ public final static FunctionIdentifier LOCAL_SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "agg-local-sql-sum", 1);
+ public final static FunctionIdentifier SQL_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "agg-sql-max", 1);
+ public final static FunctionIdentifier LOCAL_SQL_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "agg-local-sql-max", 1);
+ public final static FunctionIdentifier SQL_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "agg-sql-min", 1);
+ public final static FunctionIdentifier LOCAL_SQL_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "agg-local-sql-min", 1);
+ public final static FunctionIdentifier GLOBAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "agg-global-sql-avg", 1);
+ public final static FunctionIdentifier LOCAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "agg-local-sql-avg", 1);
+
+ public final static FunctionIdentifier SCALAR_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "sql-avg", 1);
+ public final static FunctionIdentifier SCALAR_SQL_COUNT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "sql-count", 1);
+ public final static FunctionIdentifier SCALAR_SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "sql-sum", 1);
+ public final static FunctionIdentifier SCALAR_SQL_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "sql-max", 1);
+ public final static FunctionIdentifier SCALAR_SQL_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "sql-min", 1);
+ public final static FunctionIdentifier SCALAR_GLOBAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "global-sql-avg", 1);
+ public final static FunctionIdentifier SCALAR_LOCAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "local-sql-avg", 1);
+
+ // serializable sql aggregate functions
+ public final static FunctionIdentifier SERIAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "sql-avg-serial", 1);
+ public final static FunctionIdentifier SERIAL_SQL_COUNT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "sql-count-serial", 1);
+ public final static FunctionIdentifier SERIAL_SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "sql-sum-serial", 1);
+ public final static FunctionIdentifier SERIAL_LOCAL_SQL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "local-sql-sum-serial", 1);
+ public final static FunctionIdentifier SERIAL_GLOBAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "global-sql-avg-serial", 1);
+ public final static FunctionIdentifier SERIAL_LOCAL_SQL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "local-sql-avg-serial", 1);
+
public final static FunctionIdentifier SCAN_COLLECTION = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"scan-collection", 1);
public final static FunctionIdentifier SUBSET_COLLECTION = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -739,6 +790,29 @@
addFunction(RANGE, AInt32TypeComputer.INSTANCE, true);
addFunction(RECTANGLE_CONSTRUCTOR, OptionalARectangleTypeComputer.INSTANCE, true);
+ // SQL Aggregate Functions
+ addFunction(SQL_AVG, OptionalADoubleTypeComputer.INSTANCE, true);
+ addFunction(SQL_COUNT, AInt64TypeComputer.INSTANCE, true);
+ addFunction(SQL_MAX, NonTaggedMinMaxAggTypeComputer.INSTANCE, true);
+ addPrivateFunction(LOCAL_SQL_MAX, NonTaggedMinMaxAggTypeComputer.INSTANCE, true);
+ addFunction(SQL_MIN, NonTaggedMinMaxAggTypeComputer.INSTANCE, true);
+ addPrivateFunction(LOCAL_SQL_MIN, NonTaggedMinMaxAggTypeComputer.INSTANCE, true);
+ addFunction(SQL_SUM, NonTaggedNumericAggTypeComputer.INSTANCE, true);
+ addPrivateFunction(LOCAL_SQL_SUM, NonTaggedNumericAggTypeComputer.INSTANCE, true);
+ addFunction(SCALAR_SQL_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
+ addFunction(SCALAR_SQL_COUNT, AInt64TypeComputer.INSTANCE, true);
+ addPrivateFunction(SCALAR_GLOBAL_SQL_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
+ addPrivateFunction(SCALAR_LOCAL_SQL_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
+ addFunction(SCALAR_SQL_MAX, ScalarVersionOfAggregateResultType.INSTANCE, true);
+ addFunction(SCALAR_SQL_MIN, ScalarVersionOfAggregateResultType.INSTANCE, true);
+ addFunction(SCALAR_SQL_SUM, ScalarVersionOfAggregateResultType.INSTANCE, true);
+ addPrivateFunction(SERIAL_SQL_AVG, OptionalADoubleTypeComputer.INSTANCE, true);
+ addPrivateFunction(SERIAL_SQL_COUNT, AInt64TypeComputer.INSTANCE, true);
+ addPrivateFunction(SERIAL_GLOBAL_SQL_AVG, OptionalADoubleTypeComputer.INSTANCE, true);
+ addPrivateFunction(SERIAL_LOCAL_SQL_AVG, NonTaggedLocalAvgTypeComputer.INSTANCE, true);
+ addPrivateFunction(SERIAL_SQL_SUM, NonTaggedNumericAggTypeComputer.INSTANCE, true);
+ addPrivateFunction(SERIAL_LOCAL_SQL_SUM, NonTaggedNumericAggTypeComputer.INSTANCE, true);
+
addFunction(SCALAR_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
addFunction(SCALAR_COUNT, AInt64TypeComputer.INSTANCE, true);
addPrivateFunction(SCALAR_GLOBAL_AVG, ScalarVersionOfAggregateResultType.INSTANCE, true);
@@ -913,6 +987,16 @@
scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_MAX), getAsterixFunctionInfo(MAX));
scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_MIN), getAsterixFunctionInfo(MIN));
scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SUM), getAsterixFunctionInfo(SUM));
+ // SQL Aggregate Functions
+ scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_AVG), getAsterixFunctionInfo(SQL_AVG));
+ scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_COUNT), getAsterixFunctionInfo(SQL_COUNT));
+ scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_GLOBAL_SQL_AVG),
+ getAsterixFunctionInfo(GLOBAL_SQL_AVG));
+ scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_LOCAL_SQL_AVG),
+ getAsterixFunctionInfo(LOCAL_SQL_AVG));
+ scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_MAX), getAsterixFunctionInfo(SQL_MAX));
+ scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_MIN), getAsterixFunctionInfo(SQL_MIN));
+ scalarToAggregateFunctionMap.put(getAsterixFunctionInfo(SCALAR_SQL_SUM), getAsterixFunctionInfo(SQL_SUM));
}
static {
@@ -964,6 +1048,55 @@
addAgg(SERIAL_LOCAL_SUM);
addLocalAgg(SERIAL_SUM, SERIAL_LOCAL_SUM);
addGlobalAgg(SERIAL_SUM, SERIAL_SUM);
+
+ // SQL Aggregate Functions
+ addAgg(SQL_AVG);
+ addAgg(LOCAL_SQL_AVG);
+ addAgg(GLOBAL_SQL_AVG);
+ addLocalAgg(SQL_AVG, LOCAL_SQL_AVG);
+ addGlobalAgg(SQL_AVG, GLOBAL_SQL_AVG);
+
+ addAgg(SQL_COUNT);
+ addLocalAgg(SQL_COUNT, SQL_COUNT);
+ addGlobalAgg(SQL_COUNT, SQL_SUM);
+
+ addAgg(SQL_MAX);
+ addAgg(LOCAL_SQL_MAX);
+ addLocalAgg(SQL_MAX, LOCAL_SQL_MAX);
+ addGlobalAgg(SQL_MAX, SQL_MAX);
+
+ addAgg(SQL_MIN);
+ addLocalAgg(SQL_MIN, LOCAL_SQL_MIN);
+ addGlobalAgg(SQL_MIN, SQL_MIN);
+
+ addAgg(SQL_SUM);
+ addAgg(LOCAL_SQL_SUM);
+ addLocalAgg(SQL_SUM, LOCAL_SQL_SUM);
+ addGlobalAgg(SQL_SUM, SQL_SUM);
+
+ // SQL serializable aggregate functions
+ addSerialAgg(SQL_AVG, SERIAL_SQL_AVG);
+ addSerialAgg(SQL_COUNT, SERIAL_SQL_COUNT);
+ addSerialAgg(SQL_SUM, SERIAL_SQL_SUM);
+ addSerialAgg(LOCAL_SQL_SUM, SERIAL_LOCAL_SQL_SUM);
+ addSerialAgg(LOCAL_SQL_AVG, SERIAL_LOCAL_SQL_AVG);
+ addSerialAgg(GLOBAL_SQL_AVG, SERIAL_GLOBAL_SQL_AVG);
+
+ addAgg(SERIAL_SQL_COUNT);
+ addLocalAgg(SERIAL_SQL_COUNT, SERIAL_SQL_COUNT);
+ addGlobalAgg(SERIAL_SQL_COUNT, SERIAL_SQL_SUM);
+
+ addAgg(SERIAL_SQL_AVG);
+ addAgg(SERIAL_LOCAL_SQL_AVG);
+ addAgg(SERIAL_GLOBAL_SQL_AVG);
+ addLocalAgg(SERIAL_SQL_AVG, SERIAL_LOCAL_SQL_AVG);
+ addGlobalAgg(SERIAL_SQL_AVG, SERIAL_GLOBAL_SQL_AVG);
+
+ addAgg(SERIAL_SQL_SUM);
+ addAgg(SERIAL_LOCAL_SQL_SUM);
+ addLocalAgg(SERIAL_SQL_SUM, SERIAL_LOCAL_SQL_SUM);
+ addGlobalAgg(SERIAL_SQL_SUM, SERIAL_SQL_SUM);
+
}
static {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..241ee46
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlAvgAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlAvgAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_AVG;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new ScalarSqlAvgAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID;
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlCountAggregateDescriptor.java
new file mode 100644
index 0000000..1b581e4
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlCountAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlCountAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_COUNT;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new ScalarSqlCountAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID;
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMaxAggregateDescriptor.java
new file mode 100644
index 0000000..f58d578
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMaxAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlMaxAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_MAX;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new ScalarSqlMaxAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID;
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMinAggregateDescriptor.java
new file mode 100644
index 0000000..da6e38d
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlMinAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlMinAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_MIN;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new ScalarSqlMinAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID;
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..14aa6b7
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSqlSumAggregateDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSqlSumAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public final static FunctionIdentifier FID = AsterixBuiltinFunctions.SCALAR_SQL_SUM;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new ScalarSqlSumAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID;
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java
new file mode 100644
index 0000000..4d804ee
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableAvgAggregateFunction.java
@@ -0,0 +1,276 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
+import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractSerializableAvgAggregateFunction implements ICopySerializableAggregateFunction {
+ private static final int SUM_FIELD_ID = 0;
+ private static final int COUNT_FIELD_ID = 1;
+
+ private static final int SUM_OFFSET = 0;
+ private static final int COUNT_OFFSET = 8;
+ protected static final int AGG_TYPE_OFFSET = 16;
+
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private ICopyEvaluator eval;
+ private AMutableDouble aDouble = new AMutableDouble(0);
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+
+ private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
+ private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
+ private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
+ private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
+ private DataOutput countBytesOutput = new DataOutputStream(countBytes);
+ private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
+ private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
+ private ClosedRecordConstructorEval recordEval;
+
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ADOUBLE);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ANULL);
+
+ public AbstractSerializableAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ eval = args[0].createEvaluator(inputVal);
+ }
+
+ @Override
+ public void init(DataOutput state) throws AlgebricksException {
+ try {
+ state.writeDouble(0.0);
+ state.writeLong(0);
+ state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ public abstract void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException;
+
+ public abstract void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException;
+
+ public abstract void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException;
+
+ protected abstract void processNull(byte[] state, int start);
+
+ protected void processDataValues(IFrameTupleReference tuple, byte[] state, int start, int len)
+ throws AlgebricksException {
+ if (skipStep(state, start)) {
+ return;
+ }
+ inputVal.reset();
+ eval.evaluate(tuple);
+ double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+ long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ if (typeTag == ATypeTag.NULL) {
+ processNull(state, start);
+ return;
+ } else if (aggType == ATypeTag.SYSTEM_NULL) {
+ aggType = typeTag;
+ } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+ throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+ + aggType + ".");
+ } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+ aggType = typeTag;
+ }
+ ++count;
+ switch (typeTag) {
+ case INT8: {
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT16: {
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+ }
+ }
+ inputVal.reset();
+ BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET);
+ BufferSerDeUtil.writeLong(count, state, start + COUNT_OFFSET);
+ state[start + AGG_TYPE_OFFSET] = aggType.serialize();
+ }
+
+ protected void finishPartialResults(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+ long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ if (recordEval == null) {
+ ARecordType recType;
+ try {
+ recType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] { BuiltinType.ADOUBLE,
+ BuiltinType.AINT64 }, false);
+ } catch (AsterixException e) {
+ throw new AlgebricksException(e);
+ }
+ recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum, evalCount },
+ avgBytes, result);
+ }
+
+ try {
+ if (aggType == ATypeTag.SYSTEM_NULL) {
+ if (GlobalConfig.DEBUG) {
+ GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+ }
+ result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else if (aggType == ATypeTag.NULL) {
+ result.writeByte(ATypeTag.NULL.serialize());
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(sum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
+ countBytes.reset();
+ aInt64.setValue(count);
+ longSerde.serialize(aInt64, countBytesOutput);
+ recordEval.evaluate(null);
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ protected void processPartialResults(IFrameTupleReference tuple, byte[] state, int start, int len)
+ throws AlgebricksException {
+ if (skipStep(state, start)) {
+ return;
+ }
+ double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+ long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+
+ inputVal.reset();
+ eval.evaluate(tuple);
+ byte[] serBytes = inputVal.getByteArray();
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+ switch (typeTag) {
+ case NULL: {
+ processNull(state, start);
+ break;
+ }
+ case SYSTEM_NULL: {
+ // Ignore and return.
+ break;
+ }
+ case RECORD: {
+ // Expected.
+ int nullBitmapSize = 0;
+ int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, SUM_FIELD_ID, nullBitmapSize,
+ false);
+ sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
+ int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, COUNT_FIELD_ID,
+ nullBitmapSize, false);
+ count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
+
+ BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET);
+ BufferSerDeUtil.writeLong(count, state, start + COUNT_OFFSET);
+ state[start + AGG_TYPE_OFFSET] = aggType.serialize();
+ break;
+ }
+ default: {
+ throw new AlgebricksException("Global-Avg is not defined for values of type "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+ }
+ }
+ }
+
+ protected void finishFinalResults(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+ long count = BufferSerDeUtil.getLong(state, start + COUNT_OFFSET);
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+
+ try {
+ if (count == 0 || aggType == ATypeTag.NULL)
+ nullSerde.serialize(ANull.NULL, result);
+ else {
+ aDouble.setValue(sum / count);
+ doubleSerde.serialize(aDouble, result);
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ protected boolean skipStep(byte[] state, int start) {
+ return false;
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java
new file mode 100644
index 0000000..71f709f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableCountAggregateFunction.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * count(NULL) returns NULL.
+ */
+public abstract class AbstractSerializableCountAggregateFunction implements ICopySerializableAggregateFunction {
+ private static final int MET_NULL_OFFSET = 0;
+ private static final int COUNT_OFFSET = 1;
+
+ private AMutableInt64 result = new AMutableInt64(-1);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ANULL);
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private ICopyEvaluator eval;
+
+ public AbstractSerializableCountAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ eval = args[0].createEvaluator(inputVal);
+ }
+
+ @Override
+ public void init(DataOutput state) throws AlgebricksException {
+ try {
+ state.writeBoolean(false);
+ state.writeLong(0);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ boolean metNull = BufferSerDeUtil.getBoolean(state, start);
+ long cnt = BufferSerDeUtil.getLong(state, start + 1);
+ inputVal.reset();
+ eval.evaluate(tuple);
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ if (typeTag == ATypeTag.NULL) {
+ processNull(state, start);
+ } else {
+ cnt++;
+ }
+ BufferSerDeUtil.writeBoolean(metNull, state, start + MET_NULL_OFFSET);
+ BufferSerDeUtil.writeLong(cnt, state, start + COUNT_OFFSET);
+ }
+
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+ boolean metNull = BufferSerDeUtil.getBoolean(state, start);
+ long cnt = BufferSerDeUtil.getLong(state, start + 1);
+ try {
+ if (metNull) {
+ nullSerde.serialize(ANull.NULL, out);
+ } else {
+ result.setValue(cnt);
+ int64Serde.serialize(result, out);
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+ finish(state, start, len, out);
+ }
+
+ protected void processNull(byte[] state, int start) {
+ BufferSerDeUtil.writeBoolean(true, state, start + MET_NULL_OFFSET);
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
new file mode 100644
index 0000000..68c5b43
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractSerializableSumAggregateFunction implements ICopySerializableAggregateFunction {
+ protected static final int AGG_TYPE_OFFSET = 0;
+ private static final int SUM_OFFSET = 1;
+
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private ICopyEvaluator eval;
+ private AMutableDouble aDouble = new AMutableDouble(0);
+ private AMutableFloat aFloat = new AMutableFloat(0);
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private AMutableInt32 aInt32 = new AMutableInt32(0);
+ private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+ private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+ @SuppressWarnings("rawtypes")
+ public ISerializerDeserializer serde;
+
+ public AbstractSerializableSumAggregateFunction(ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ eval = args[0].createEvaluator(inputVal);
+ }
+
+ @Override
+ public void init(DataOutput state) throws AlgebricksException {
+ try {
+ state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ state.writeDouble(0.0);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ if (skipStep(state, start)) {
+ return;
+ }
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+ inputVal.reset();
+ eval.evaluate(tuple);
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ if (typeTag == ATypeTag.NULL) {
+ processNull(state, start);
+ return;
+ } else if (aggType == ATypeTag.SYSTEM_NULL) {
+ aggType = typeTag;
+ } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+ throw new AlgebricksException("Unexpected type " + typeTag
+ + " in aggregation input stream. Expected type (or a promotable type to)" + aggType + ".");
+ }
+
+ if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+ aggType = typeTag;
+ }
+
+ switch (typeTag) {
+ case INT8: {
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT16: {
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case NULL: {
+ aggType = typeTag;
+ break;
+ }
+ case SYSTEM_NULL: {
+ processSystemNull();
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
+ }
+ }
+ state[start + AGG_TYPE_OFFSET] = aggType.serialize();
+ BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+ try {
+ switch (aggType) {
+ case INT8: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
+ aInt8.setValue((byte) sum);
+ serde.serialize(aInt8, out);
+ break;
+ }
+ case INT16: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
+ aInt16.setValue((short) sum);
+ serde.serialize(aInt16, out);
+ break;
+ }
+ case INT32: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+ aInt32.setValue((int) sum);
+ serde.serialize(aInt32, out);
+ break;
+ }
+ case INT64: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+ aInt64.setValue((long) sum);
+ serde.serialize(aInt64, out);
+ break;
+ }
+ case FLOAT: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
+ aFloat.setValue((float) sum);
+ serde.serialize(aFloat, out);
+ break;
+ }
+ case DOUBLE: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+ aDouble.setValue(sum);
+ serde.serialize(aDouble, out);
+ break;
+ }
+ case NULL: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ break;
+ }
+ case SYSTEM_NULL: {
+ finishSystemNull(out);
+ break;
+ }
+ default:
+ throw new AlgebricksException("SumAggregationFunction: incompatible type for the result ("
+ + aggType + "). ");
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+ finish(state, start, len, out);
+ }
+
+ protected boolean skipStep(byte[] state, int start) {
+ return false;
+ }
+ protected abstract void processNull(byte[] state, int start);
+ protected abstract void processSystemNull() throws AlgebricksException;
+ protected abstract void finishSystemNull(DataOutput out) throws IOException;
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java
index 72e0045..a215a07 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateDescriptor.java
@@ -14,37 +14,15 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
@@ -62,129 +40,15 @@
@Override
public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- ICopyEvaluatorFactory[] args) throws AlgebricksException {
- final ICopyEvaluatorFactory[] evals = args;
-
+ final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopySerializableAggregateFunctionFactory() {
private static final long serialVersionUID = 1L;
+ @Override
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new ICopySerializableAggregateFunction() {
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
-
- private AMutableDouble aDouble = new AMutableDouble(0);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
-
- @Override
- public void init(DataOutput state) throws AlgebricksException {
- try {
- state.writeDouble(0.0);
- state.writeLong(0);
- state.writeBoolean(false);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
- throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- double sum = BufferSerDeUtil.getDouble(state, start);
- long count = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
- if (inputVal.getLength() > 0) {
- ++count;
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
- .getByteArray()[0]);
- switch (typeTag) {
- case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- metNull = true;
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute AVG for values of type "
- + typeTag);
- }
- }
- inputVal.reset();
- }
- BufferSerDeUtil.writeDouble(sum, state, start);
- BufferSerDeUtil.writeLong(count, state, start + 8);
- BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
- }
-
- @Override
- public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
- double sum = BufferSerDeUtil.getDouble(state, start);
- long count = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
- if (count == 0) {
- GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
- } else {
- try {
- if (metNull)
- nullSerde.serialize(ANull.NULL, result);
- else {
- aDouble.setValue(sum / count);
- doubleSerde.serialize(aDouble, result);
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
- }
-
- @Override
- public void finishPartial(byte[] data, int start, int len, DataOutput partialResult)
- throws AlgebricksException {
- try {
- partialResult.write(data, start, len);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- };
+ return new SerializableAvgAggregateFunction(args);
}
};
}
+
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java
new file mode 100644
index 0000000..202933d
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableAvgAggregateFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+ public SerializableAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ super(args);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ processDataValues(tuple, state, start, len);
+ }
+
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finishFinalResults(state, start, len, result);
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finish(state, start, len, result);
+ }
+
+ protected void processNull(byte[] state, int start) {
+ state[start + AGG_TYPE_OFFSET] = ATypeTag.NULL.serialize();
+ }
+
+ @Override
+ protected boolean skipStep(byte[] state, int start) {
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ return (aggType == ATypeTag.NULL);
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
index a9b264c..0c9ce06 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
@@ -14,29 +14,15 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
/**
* count(NULL) returns NULL.
@@ -63,68 +49,7 @@
@Override
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
- return new ICopySerializableAggregateFunction() {
- private AMutableInt64 result = new AMutableInt64(-1);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-
- @Override
- public void init(DataOutput state) throws AlgebricksException {
- try {
- state.writeBoolean(false);
- state.writeLong(0);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
- throws AlgebricksException {
- boolean metNull = BufferSerDeUtil.getBoolean(state, start);
- long cnt = BufferSerDeUtil.getLong(state, start + 1);
- inputVal.reset();
- eval.evaluate(tuple);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
- .deserialize(inputVal.getByteArray()[0]);
- if (typeTag == ATypeTag.NULL) {
- metNull = true;
- } else {
- cnt++;
- }
- BufferSerDeUtil.writeBoolean(metNull, state, start);
- BufferSerDeUtil.writeLong(cnt, state, start + 1);
- }
-
- @Override
- public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
- boolean metNull = BufferSerDeUtil.getBoolean(state, start);
- long cnt = BufferSerDeUtil.getLong(state, start + 1);
- try {
- if (metNull) {
- nullSerde.serialize(ANull.NULL, out);
- } else {
- result.setValue(cnt);
- int64Serde.serialize(result, out);
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void finishPartial(byte[] state, int start, int len, DataOutput out)
- throws AlgebricksException {
- finish(state, start, len, out);
- }
- };
+ return new SerializableCountAggregateFunction(args);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateFunction.java
new file mode 100644
index 0000000..7a6aacd
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+/**
+ * count(NULL) returns NULL.
+ */
+public class SerializableCountAggregateFunction extends AbstractSerializableCountAggregateFunction {
+ public SerializableCountAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ super(args);
+ }
+}
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
index f720434..ccfeb43 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
@@ -15,44 +15,15 @@
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableGlobalAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
@@ -70,154 +41,13 @@
@Override
public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- ICopyEvaluatorFactory[] args) throws AlgebricksException {
- final ICopyEvaluatorFactory[] evals = args;
- List<IAType> unionList = new ArrayList<IAType>();
- unionList.add(BuiltinType.ANULL);
- unionList.add(BuiltinType.ADOUBLE);
- ARecordType _recType;
- try {
- _recType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
- new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
- } catch (AsterixException e) {
- throw new AlgebricksException(e);
- }
-
- final ARecordType recType = _recType;
-
+ final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopySerializableAggregateFunctionFactory() {
private static final long serialVersionUID = 1L;
@Override
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
- return new ICopySerializableAggregateFunction() {
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
-
- private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
- private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
- private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput countBytesOutput = new DataOutputStream(countBytes);
- private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
- private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
- private ClosedRecordConstructorEval recordEval;
-
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
-
- @Override
- public void init(DataOutput state) throws AlgebricksException {
- try {
- state.writeDouble(0.0);
- state.writeLong(0);
- state.writeBoolean(false);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
- throws AlgebricksException {
- double globalSum = BufferSerDeUtil.getDouble(state, start);
- long globalCount = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
- inputVal.reset();
- eval.evaluate(tuple);
- byte[] serBytes = inputVal.getByteArray();
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
- switch (typeTag) {
- case NULL: {
- metNull = true;
- break;
- }
- case SYSTEM_NULL: {
- // Ignore and return.
- return;
- }
- case RECORD: {
- // Expected.
- break;
- }
- default: {
- throw new AlgebricksException("Global-Avg is not defined for values of type "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
- }
- }
- int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, 1, true);
- if (offset1 == 0) // the sum is null
- metNull = true;
- else
- globalSum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
- int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, 1, true);
- if (offset2 != 0) // the count is not null
- globalCount += AInt64SerializerDeserializer.getLong(serBytes, offset2);
-
- BufferSerDeUtil.writeDouble(globalSum, state, start);
- BufferSerDeUtil.writeLong(globalCount, state, start + 8);
- BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
-
- }
-
- @Override
- public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
- double globalSum = BufferSerDeUtil.getDouble(state, start);
- long globalCount = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
- try {
- if (globalCount == 0 || metNull)
- nullSerde.serialize(ANull.NULL, result);
- else {
- aDouble.setValue(globalSum / globalCount);
- doubleSerde.serialize(aDouble, result);
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void finishPartial(byte[] state, int start, int len, DataOutput result)
- throws AlgebricksException {
- double globalSum = BufferSerDeUtil.getDouble(state, start);
- long globalCount = BufferSerDeUtil.getLong(state, start + 8);
- boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-
- if (recordEval == null)
- recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
- evalCount }, avgBytes, result);
-
- try {
- if (globalCount == 0 || metNull) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(globalSum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt64.setValue(globalCount);
- longSerde.serialize(aInt64, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
- };
+ return new SerializableGlobalAvgAggregateFunction(args);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java
new file mode 100644
index 0000000..c6d4b5a
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateFunction.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableGlobalAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+ public SerializableGlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ super(args);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ processPartialResults(tuple, state, start, len);
+ }
+
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finishFinalResults(state, start, len, result);
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finishPartialResults(state, start, len, result);
+ }
+
+ protected void processNull(byte[] state, int start) {
+ state[start + AGG_TYPE_OFFSET] = ATypeTag.NULL.serialize();
+ }
+
+ @Override
+ protected boolean skipStep(byte[] state, int start) {
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ return (aggType == ATypeTag.NULL);
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..7fa6b9c
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableGlobalSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new SerializableGlobalSqlAvgAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.SERIAL_GLOBAL_SQL_AVG;
+ }
+
+ @Override
+ public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+ final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ return new ICopySerializableAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+ return new SerializableGlobalSqlAvgAggregateFunction(args);
+ }
+ };
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..cc5043f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableGlobalSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+ public SerializableGlobalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ super(args);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ processPartialResults(tuple, state, start, len);
+ }
+
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finishFinalResults(state, start, len, result);
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finishPartialResults(state, start, len, result);
+ }
+
+ @Override
+ protected void processNull(byte[] state, int start) {
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
index 219204b..c015682 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
@@ -15,48 +15,15 @@
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableLocalAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
@@ -74,163 +41,12 @@
@Override
public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- ICopyEvaluatorFactory[] args) throws AlgebricksException {
- final ICopyEvaluatorFactory[] evals = args;
- List<IAType> unionList = new ArrayList<IAType>();
- unionList.add(BuiltinType.ANULL);
- unionList.add(BuiltinType.ADOUBLE);
- ARecordType tmpRecType;
- try {
- tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
- new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
- } catch (AsterixException e) {
- throw new AlgebricksException(e);
- }
-
- final ARecordType recType = tmpRecType;
-
+ final ICopyEvaluatorFactory[] args) throws AlgebricksException {
return new ICopySerializableAggregateFunctionFactory() {
private static final long serialVersionUID = 1L;
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new ICopySerializableAggregateFunction() {
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = evals[0].createEvaluator(inputVal);
- private ClosedRecordConstructorEval recordEval;
-
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
-
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
-
- private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
- private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
- private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput countBytesOutput = new DataOutputStream(countBytes);
- private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
- private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
-
- @Override
- public void init(DataOutput state) throws AlgebricksException {
- try {
- state.writeDouble(0.0);
- state.writeLong(0);
- state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
- throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- double sum = BufferSerDeUtil.getDouble(state, start);
- long count = BufferSerDeUtil.getLong(state, start + 8);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
- .deserialize(inputVal.getByteArray()[0]);
- ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
- if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
- aggType = ATypeTag.NULL;
- return;
- } else if (aggType == ATypeTag.SYSTEM_NULL) {
- aggType = typeTag;
- } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
- throw new AlgebricksException("Unexpected type " + typeTag
- + " in aggregation input stream. Expected type " + aggType + ".");
- }
- ++count;
- switch (typeTag) {
- case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
- }
- }
- BufferSerDeUtil.writeDouble(sum, state, start);
- BufferSerDeUtil.writeLong(count, state, start + 8);
- state[start + 16] = aggType.serialize();
- }
-
- @Override
- public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
- double sum = BufferSerDeUtil.getDouble(state, start);
- long count = BufferSerDeUtil.getLong(state, start + 8);
- ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
- if (recordEval == null) {
- recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
- evalCount }, avgBytes, result);
- }
- try {
- if (count == 0) {
- result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- return;
- }
- if (aggType == ATypeTag.NULL) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(sum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt64.setValue(count);
- int64Serde.serialize(aInt64, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void finishPartial(byte[] state, int start, int len, DataOutput partialResult)
- throws AlgebricksException {
- finish(state, start, len, partialResult);
- }
-
- };
+ return new SerializableLocalAvgAggregateFunction(args);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
new file mode 100644
index 0000000..00835d7
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableLocalAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+ public SerializableLocalAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ super(args);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ processPartialResults(tuple, state, start, len);
+ }
+
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finishPartialResults(state, start, len, result);
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finish(state, start, len, result);
+ }
+
+ protected void processNull(byte[] state, int start) {
+ state[start + AGG_TYPE_OFFSET] = ATypeTag.NULL.serialize();
+ }
+
+ @Override
+ protected boolean skipStep(byte[] state, int start) {
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ return (aggType == ATypeTag.NULL);
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..58415d3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableLocalSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new SerializableLocalSqlAvgAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.SERIAL_LOCAL_SQL_AVG;
+ }
+
+ @Override
+ public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+ final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ return new ICopySerializableAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+ return new SerializableLocalSqlAvgAggregateFunction(args);
+ }
+ };
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..0caeadf
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableLocalSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+ public SerializableLocalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ super(args);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ processPartialResults(tuple, state, start, len);
+ }
+
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finishPartialResults(state, start, len, result);
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finish(state, start, len, result);
+ }
+
+ @Override
+ protected void processNull(byte[] state, int start) {
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..2ce85c6
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new SerializableLocalSqlSumAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.SERIAL_LOCAL_SQL_SUM;
+ }
+
+ @Override
+ public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+ final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ return new ICopySerializableAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+ return new SerializableSqlSumAggregateFunction(args, true);
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
index 3f3aaff..bd6ead8 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
@@ -27,7 +27,6 @@
public class SerializableLocalSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
- private final static FunctionIdentifier FID = AsterixBuiltinFunctions.SERIAL_LOCAL_SUM;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
public IFunctionDescriptor createFunctionDescriptor() {
return new SerializableLocalSumAggregateDescriptor();
@@ -36,7 +35,7 @@
@Override
public FunctionIdentifier getIdentifier() {
- return FID;
+ return AsterixBuiltinFunctions.SERIAL_LOCAL_SUM;
}
@Override
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..786510b
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new SerializableSqlAvgAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.SERIAL_SQL_AVG;
+ }
+
+ @Override
+ public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+ final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ return new ICopySerializableAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+ return new SerializableSqlAvgAggregateFunction(args);
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..c1f77a3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
+
+ public SerializableSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ super(args);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+ processDataValues(tuple, state, start, len);
+ }
+
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finishFinalResults(state, start, len, result);
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
+ finish(state, start, len, result);
+ }
+
+ @Override
+ protected void processNull(byte[] state, int start) {
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
new file mode 100644
index 0000000..5bfd6f2
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+/**
+ * count(NULL) returns NULL.
+ */
+public class SerializableSqlCountAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new SerializableSqlCountAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.SERIAL_SQL_COUNT;
+ }
+
+ @Override
+ public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+ final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ return new ICopySerializableAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+ return new SerializableSqlCountAggregateFunction(args);
+ }
+ };
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java
new file mode 100644
index 0000000..525b643
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+/**
+ * count(NULL) returns NULL.
+ */
+public class SerializableSqlCountAggregateFunction extends AbstractSerializableCountAggregateFunction {
+ public SerializableSqlCountAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ super(args);
+ }
+
+ @Override
+ protected void processNull(byte[] state, int start) {
+ }
+}
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..c68b3b3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new SerializableSqlSumAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.SERIAL_SQL_SUM;
+ }
+
+ @Override
+ public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+ final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ return new ICopySerializableAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+ return new SerializableSqlSumAggregateFunction(args, false);
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
new file mode 100644
index 0000000..f80ade3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+
+public class SerializableSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+ private final boolean isLocalAgg;
+
+ public SerializableSqlSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
+ throws AlgebricksException {
+ super(args);
+ this.isLocalAgg = isLocalAgg;
+ }
+
+ @Override
+ protected void processNull(byte[] state, int start) {
+ }
+
+ @Override
+ protected void processSystemNull() throws AlgebricksException {
+ // For global aggregates simply ignore system null here,
+ // but if all input value are system null, then we should return
+ // null in finish().
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void finishSystemNull(DataOutput out) throws IOException {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ }
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
index bea6ab8..15e546a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -17,196 +17,54 @@
import java.io.DataOutput;
import java.io.IOException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public class SerializableSumAggregateFunction implements ICopySerializableAggregateFunction {
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval;
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableFloat aFloat = new AMutableFloat(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
- private AMutableInt32 aInt32 = new AMutableInt32(0);
- private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
- private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
- @SuppressWarnings("rawtypes")
- private ISerializerDeserializer serde;
+public class SerializableSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
private final boolean isLocalAgg;
public SerializableSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
throws AlgebricksException {
- eval = args[0].createEvaluator(inputVal);
+ super(args);
this.isLocalAgg = isLocalAgg;
}
- @Override
- public void init(DataOutput state) throws AlgebricksException {
- try {
- state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- state.writeDouble(0.0);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
+ protected void processNull(byte[] state, int start) {
+ ATypeTag aggType = ATypeTag.NULL;
+ state[start + AGG_TYPE_OFFSET] = aggType.serialize();
}
@Override
- public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
- ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
- double sum = BufferSerDeUtil.getDouble(state, start + 1);
- inputVal.reset();
- eval.evaluate(tuple);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
- if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
- aggType = ATypeTag.NULL;
- return;
- } else if (aggType == ATypeTag.SYSTEM_NULL) {
- aggType = typeTag;
- } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
- throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
- + aggType + ".");
+ protected boolean skipStep(byte[] state, int start) {
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ return (aggType == ATypeTag.NULL);
+ }
+
+ @Override
+ protected void processSystemNull() throws AlgebricksException {
+ // For global aggregates simply ignore system null here,
+ // but if all input value are system null, then we should return
+ // null in finish().
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
}
- switch (typeTag) {
- case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- aggType = typeTag;
- break;
- }
- case SYSTEM_NULL: {
- // For global aggregates simply ignore system null here,
- // but if all input value are system null, then we should return
- // null in finish().
- if (isLocalAgg) {
- throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
- }
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
- }
- }
- state[start] = aggType.serialize();
- BufferSerDeUtil.writeDouble(sum, state, start + 1);
}
@SuppressWarnings("unchecked")
@Override
- public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
- ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
- double sum = BufferSerDeUtil.getDouble(state, start + 1);
- try {
- switch (aggType) {
- case INT8: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
- aInt8.setValue((byte) sum);
- serde.serialize(aInt8, out);
- break;
- }
- case INT16: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
- aInt16.setValue((short) sum);
- serde.serialize(aInt16, out);
- break;
- }
- case INT32: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
- aInt32.setValue((int) sum);
- serde.serialize(aInt32, out);
- break;
- }
- case INT64: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
- aInt64.setValue((long) sum);
- serde.serialize(aInt64, out);
- break;
- }
- case FLOAT: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
- aFloat.setValue((float) sum);
- serde.serialize(aFloat, out);
- break;
- }
- case DOUBLE: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
- aDouble.setValue(sum);
- serde.serialize(aDouble, out);
- break;
- }
- case NULL: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- break;
- }
- case SYSTEM_NULL: {
- // Empty stream. For local agg return system null. For global agg return null.
- if (isLocalAgg) {
- out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- } else {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- }
- break;
- }
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
+ protected void finishSystemNull(DataOutput out) throws IOException {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
}
-
}
- @Override
- public void finishPartial(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
- finish(state, start, len, out);
- }
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
new file mode 100644
index 0000000..cfc5f1c
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
+import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunction {
+ private static final int SUM_FIELD_ID = 0;
+ private static final int COUNT_FIELD_ID = 1;
+
+ private final ARecordType recType;
+
+ private DataOutput out;
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private ICopyEvaluator eval;
+ protected ATypeTag aggType;
+ private double sum;
+ private long count;
+ private AMutableDouble aDouble = new AMutableDouble(0);
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+
+ private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
+ private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
+ private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
+ private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
+ private DataOutput countBytesOutput = new DataOutputStream(countBytes);
+ private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
+ private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
+ private ClosedRecordConstructorEval recordEval;
+
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ADOUBLE);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ANULL);
+
+ public AbstractAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+ throws AlgebricksException {
+ eval = args[0].createEvaluator(inputVal);
+ out = output.getDataOutput();
+
+ ARecordType tmpRecType;
+ try {
+ tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] { BuiltinType.ADOUBLE,
+ BuiltinType.AINT64 }, false);
+ } catch (AsterixException e) {
+ throw new AlgebricksException(e);
+ }
+
+ recType = tmpRecType;
+ recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum, evalCount }, avgBytes,
+ out);
+ }
+
+ @Override
+ public void init() {
+ aggType = ATypeTag.SYSTEM_NULL;
+ sum = 0.0;
+ count = 0;
+ }
+
+ public abstract void step(IFrameTupleReference tuple) throws AlgebricksException;
+
+ public abstract void finish() throws AlgebricksException;
+
+ public abstract void finishPartial() throws AlgebricksException;
+
+ protected abstract void processNull();
+
+ protected void processDataValues(IFrameTupleReference tuple) throws AlgebricksException {
+ if (skipStep()) {
+ return;
+ }
+ inputVal.reset();
+ eval.evaluate(tuple);
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ if (typeTag == ATypeTag.NULL) {
+ processNull();
+ return;
+ } else if (aggType == ATypeTag.SYSTEM_NULL) {
+ aggType = typeTag;
+ } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+ throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+ + aggType + ".");
+ } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+ aggType = typeTag;
+ }
+ ++count;
+ switch (typeTag) {
+ case INT8: {
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT16: {
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+ }
+ }
+ inputVal.reset();
+ }
+
+ protected void finishPartialResults() throws AlgebricksException {
+ try {
+ // Double check that count 0 is accounted
+ if (aggType == ATypeTag.SYSTEM_NULL) {
+ if (GlobalConfig.DEBUG) {
+ GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+ }
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else if (aggType == ATypeTag.NULL) {
+ out.writeByte(ATypeTag.NULL.serialize());
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(sum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
+ countBytes.reset();
+ aInt64.setValue(count);
+ longSerde.serialize(aInt64, countBytesOutput);
+ recordEval.evaluate(null);
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ protected void processPartialResults(IFrameTupleReference tuple) throws AlgebricksException {
+ if (skipStep()) {
+ return;
+ }
+ inputVal.reset();
+ eval.evaluate(tuple);
+ byte[] serBytes = inputVal.getByteArray();
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+ switch (typeTag) {
+ case NULL: {
+ processNull();
+ break;
+ }
+ case SYSTEM_NULL: {
+ // Ignore and return.
+ break;
+ }
+ case RECORD: {
+ // Expected.
+ int nullBitmapSize = 0;
+ int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, SUM_FIELD_ID, nullBitmapSize,
+ false);
+ sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
+ int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, COUNT_FIELD_ID,
+ nullBitmapSize, false);
+ count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
+ break;
+ }
+ default: {
+ throw new AlgebricksException("Global-Avg is not defined for values of type "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+ }
+ }
+ }
+
+ protected void finishFinalResults() throws AlgebricksException {
+ try {
+ if (count == 0 || aggType == ATypeTag.NULL) {
+ nullSerde.serialize(ANull.NULL, out);
+ } else {
+ aDouble.setValue(sum / count);
+ doubleSerde.serialize(aDouble, out);
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ protected boolean skipStep() {
+ return false;
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
new file mode 100644
index 0000000..29e57f5
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * COUNT returns the number of items in the given list. Note that COUNT(NULL) is not allowed.
+ */
+public abstract class AbstractCountAggregateFunction implements ICopyAggregateFunction {
+ private AMutableInt64 result = new AMutableInt64(-1);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private ICopyEvaluator eval;
+ protected long cnt;
+ private DataOutput out;
+
+ public AbstractCountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+ throws AlgebricksException {
+ eval = args[0].createEvaluator(inputVal);
+ out = output.getDataOutput();
+ }
+
+ @Override
+ public void init() {
+ cnt = 0;
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ inputVal.reset();
+ eval.evaluate(tuple);
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ // Ignore SYSTEM_NULL.
+ if (typeTag == ATypeTag.NULL) {
+ processNull();
+ } else if (typeTag != ATypeTag.SYSTEM_NULL) {
+ cnt++;
+ }
+ }
+
+ @Override
+ public void finish() throws AlgebricksException {
+ try {
+ result.setValue(cnt);
+ int64Serde.serialize(result, out);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
+ }
+
+ protected abstract void processNull();
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
new file mode 100644
index 0000000..29aba33
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.asterix.om.types.hierachy.ITypePromoteComputer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractMinMaxAggregateFunction implements ICopyAggregateFunction {
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
+ private ArrayBackedValueStorage tempValForCasting = new ArrayBackedValueStorage();
+ protected DataOutput out;
+ private ICopyEvaluator eval;
+ protected ATypeTag aggType;
+ private IBinaryComparator cmp;
+ private ITypePromoteComputer tpc;
+ private final boolean isMin;
+
+ public AbstractMinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin)
+ throws AlgebricksException {
+ out = provider.getDataOutput();
+ eval = args[0].createEvaluator(inputVal);
+ this.isMin = isMin;
+ }
+
+ @Override
+ public void init() {
+ aggType = ATypeTag.SYSTEM_NULL;
+ outputVal.reset();
+ tempValForCasting.reset();
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ inputVal.reset();
+ eval.evaluate(tuple);
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ if (typeTag == ATypeTag.NULL) {
+ processNull();
+ return;
+ } else if (aggType == ATypeTag.NULL) {
+ return;
+ } else if (aggType == ATypeTag.SYSTEM_NULL) {
+ if (typeTag == ATypeTag.SYSTEM_NULL) {
+ // Ignore.
+ return;
+ }
+ // First value encountered. Set type, comparator, and initial value.
+ aggType = typeTag;
+ // Set comparator.
+ IBinaryComparatorFactory cmpFactory = AqlBinaryComparatorFactoryProvider.INSTANCE
+ .getBinaryComparatorFactory(aggType, isMin);
+ cmp = cmpFactory.createBinaryComparator();
+ // Initialize min value.
+ outputVal.assign(inputVal);
+ } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+ throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+ + aggType + ".");
+ } else {
+
+ // If a system_null is encountered locally, it would be an error; otherwise if it is seen
+ // by a global aggregator, it is simple ignored.
+ if (typeTag == ATypeTag.SYSTEM_NULL) {
+ processSystemNull();
+ return;
+ }
+
+ if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+ tpc = ATypeHierarchy.getTypePromoteComputer(aggType, typeTag);
+ aggType = typeTag;
+ cmp = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, isMin)
+ .createBinaryComparator();
+ if (tpc != null) {
+ tempValForCasting.reset();
+ try {
+ tpc.promote(outputVal.getByteArray(), outputVal.getStartOffset() + 1,
+ outputVal.getLength() - 1, tempValForCasting);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ outputVal.reset();
+ outputVal.assign(tempValForCasting);
+ }
+ if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
+ outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
+ outputVal.assign(inputVal);
+ }
+
+ } else {
+ tpc = ATypeHierarchy.getTypePromoteComputer(typeTag, aggType);
+ if (tpc != null) {
+ tempValForCasting.reset();
+ try {
+ tpc.promote(inputVal.getByteArray(), inputVal.getStartOffset() + 1, inputVal.getLength() - 1,
+ tempValForCasting);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ if (cmp.compare(tempValForCasting.getByteArray(), tempValForCasting.getStartOffset(),
+ tempValForCasting.getLength(), outputVal.getByteArray(), outputVal.getStartOffset(),
+ outputVal.getLength()) < 0) {
+ outputVal.assign(tempValForCasting);
+ }
+ } else {
+ if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
+ outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
+ outputVal.assign(inputVal);
+ }
+ }
+
+ }
+ }
+ }
+
+ @Override
+ public void finish() throws AlgebricksException {
+ try {
+ switch (aggType) {
+ case NULL: {
+ out.writeByte(ATypeTag.NULL.serialize());
+ break;
+ }
+ case SYSTEM_NULL: {
+ finishSystemNull();
+ break;
+ }
+ default: {
+ out.write(outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength());
+ break;
+ }
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
+ }
+
+ protected boolean skipStep() {
+ return false;
+ }
+
+ protected abstract void processNull();
+
+ protected abstract void processSystemNull() throws AlgebricksException;
+
+ protected abstract void finishSystemNull() throws IOException;
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
new file mode 100644
index 0000000..1332dd5
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunction {
+ protected DataOutput out;
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private ICopyEvaluator eval;
+ private double sum;
+ protected ATypeTag aggType;
+ private AMutableDouble aDouble = new AMutableDouble(0);
+ private AMutableFloat aFloat = new AMutableFloat(0);
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private AMutableInt32 aInt32 = new AMutableInt32(0);
+ private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+ private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+ @SuppressWarnings("rawtypes")
+ protected ISerializerDeserializer serde;
+
+ public AbstractSumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider)
+ throws AlgebricksException {
+ out = provider.getDataOutput();
+ eval = args[0].createEvaluator(inputVal);
+ }
+
+ @Override
+ public void init() {
+ aggType = ATypeTag.SYSTEM_NULL;
+ sum = 0.0;
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ if (skipStep()) {
+ return;
+ }
+ inputVal.reset();
+ eval.evaluate(tuple);
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ if (typeTag == ATypeTag.NULL) {
+ processNull();
+ return;
+ } else if (aggType == ATypeTag.SYSTEM_NULL) {
+ aggType = typeTag;
+ } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
+ throw new AlgebricksException("Unexpected type " + typeTag
+ + " in aggregation input stream. Expected type (or a promotable type to)" + aggType + ".");
+ }
+
+ if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+ aggType = typeTag;
+ }
+
+ switch (typeTag) {
+ case INT8: {
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT16: {
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case NULL: {
+ break;
+ }
+ case SYSTEM_NULL: {
+ processSystemNull();
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void finish() throws AlgebricksException {
+ try {
+ switch (aggType) {
+ case INT8: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
+ aInt8.setValue((byte) sum);
+ serde.serialize(aInt8, out);
+ break;
+ }
+ case INT16: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
+ aInt16.setValue((short) sum);
+ serde.serialize(aInt16, out);
+ break;
+ }
+ case INT32: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+ aInt32.setValue((int) sum);
+ serde.serialize(aInt32, out);
+ break;
+ }
+ case INT64: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+ aInt64.setValue((long) sum);
+ serde.serialize(aInt64, out);
+ break;
+ }
+ case FLOAT: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
+ aFloat.setValue((float) sum);
+ serde.serialize(aFloat, out);
+ break;
+ }
+ case DOUBLE: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+ aDouble.setValue(sum);
+ serde.serialize(aDouble, out);
+ break;
+ }
+ case NULL: {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ break;
+ }
+ case SYSTEM_NULL: {
+ finishSystemNull();
+ break;
+ }
+ default:
+ throw new AlgebricksException("SumAggregationFunction: incompatible type for the result ("
+ + aggType + "). ");
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
+ }
+
+ protected boolean skipStep() {
+ return false;
+ }
+ protected abstract void processNull();
+ protected abstract void processSystemNull() throws AlgebricksException;
+ protected abstract void finishSystemNull() throws IOException;
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
index 7815606..3ff836a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
@@ -15,51 +15,16 @@
package edu.uci.ics.asterix.runtime.aggregates.std;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class AvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
@@ -78,165 +43,13 @@
@Override
public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
throws AlgebricksException {
- List<IAType> unionList = new ArrayList<IAType>();
- unionList.add(BuiltinType.ANULL);
- unionList.add(BuiltinType.ADOUBLE);
- ARecordType tmpRecType;
- try {
- tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
- new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, true);
- } catch (AsterixException e) {
- throw new AlgebricksException(e);
- }
-
- final ARecordType recType = tmpRecType;
-
return new ICopyAggregateFunctionFactory() {
private static final long serialVersionUID = 1L;
@Override
public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
throws AlgebricksException {
-
- return new ICopyAggregateFunction() {
-
- private DataOutput out = provider.getDataOutput();
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
- private double sum;
- private long count;
- private ATypeTag aggType;
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
-
- private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
- private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
- private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput countBytesOutput = new DataOutputStream(countBytes);
- private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
- private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
- private ClosedRecordConstructorEval recordEval = new ClosedRecordConstructorEval(recType,
- new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, out);
-
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<AInt64> intSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
-
- @Override
- public void init() throws AlgebricksException {
- aggType = ATypeTag.SYSTEM_NULL;
- sum = 0.0;
- count = 0;
- }
-
- @Override
- public void step(IFrameTupleReference tuple) throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
- .deserialize(inputVal.getByteArray()[0]);
- if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
- aggType = ATypeTag.NULL;
- return;
- } else if (aggType == ATypeTag.SYSTEM_NULL) {
- aggType = typeTag;
- } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
- throw new AlgebricksException("Unexpected type " + typeTag
- + " in aggregation input stream. Expected type " + aggType + ".");
- } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
- aggType = typeTag;
- }
-
- if (typeTag != ATypeTag.SYSTEM_NULL) {
- ++count;
- }
- switch (typeTag) {
- case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
- }
- }
- }
-
- @Override
- public void finish() throws AlgebricksException {
- try {
- if (count == 0 || aggType == ATypeTag.NULL) {
- nullSerde.serialize(ANull.NULL, out);
- } else {
- aDouble.setValue(sum / count);
- doubleSerde.serialize(aDouble, out);
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void finishPartial() throws AlgebricksException {
- if (count == 0) {
- if (GlobalConfig.DEBUG) {
- GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
- }
- } else {
- try {
- if (aggType == ATypeTag.NULL) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(sum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt64.setValue(count);
- intSerde.serialize(aInt64, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
- }
- };
+ return new AvgAggregateFunction(args, provider);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java
new file mode 100644
index 0000000..2470e5a
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class AvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+ public AvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
+ super(args, output);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ processDataValues(tuple);
+ }
+
+ @Override
+ public void finish() throws AlgebricksException {
+ finishFinalResults();
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
+ }
+
+ @Override
+ protected void processNull() {
+ aggType = ATypeTag.NULL;
+ }
+
+ @Override
+ protected boolean skipStep() {
+ return (aggType == ATypeTag.NULL);
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
index c63ab62..7c97fc2 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
@@ -14,70 +14,21 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
/**
* COUNT returns the number of items in the given list. Note that COUNT(NULL) is not allowed.
*/
-public class CountAggregateFunction implements ICopyAggregateFunction {
- private AMutableInt64 result = new AMutableInt64(-1);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval;
- private long cnt;
- private DataOutput out;
+public class CountAggregateFunction extends AbstractCountAggregateFunction {
public CountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
- eval = args[0].createEvaluator(inputVal);
- out = output.getDataOutput();
+ super(args, output);
}
- @Override
- public void init() {
- cnt = 0;
+ protected void processNull() {
+ cnt++;
}
- @Override
- public void step(IFrameTupleReference tuple) throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
- // Ignore SYSTEM_NULL.
- if (typeTag != ATypeTag.SYSTEM_NULL) {
- cnt++;
- }
- }
-
- @Override
- public void finish() throws AlgebricksException {
- try {
- result.setValue(cnt);
- int64Serde.serialize(result, out);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void finishPartial() throws AlgebricksException {
- finish();
- }
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
index eb5d6bd..230e06b 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
@@ -15,45 +15,16 @@
package edu.uci.ics.asterix.runtime.aggregates.std;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class GlobalAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
@@ -73,142 +44,13 @@
@Override
public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
throws AlgebricksException {
- List<IAType> unionList = new ArrayList<IAType>();
- unionList.add(BuiltinType.ANULL);
- unionList.add(BuiltinType.ADOUBLE);
- ARecordType tmpRecType;
- try {
- tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
- new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, false);
- } catch (AsterixException e) {
- throw new AlgebricksException(e);
- }
-
- final ARecordType recType = tmpRecType;
-
return new ICopyAggregateFunctionFactory() {
private static final long serialVersionUID = 1L;
@Override
public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
throws AlgebricksException {
-
- return new ICopyAggregateFunction() {
-
- private DataOutput out = provider.getDataOutput();
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
- private double globalSum;
- private long globalCount;
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
-
- private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
- private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
- private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput countBytesOutput = new DataOutputStream(countBytes);
- private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
- private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
- private ClosedRecordConstructorEval recordEval = new ClosedRecordConstructorEval(recType,
- new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, out);
-
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- private boolean metNull;
-
- @Override
- public void init() {
- globalSum = 0.0;
- globalCount = 0;
- metNull = false;
- }
-
- @Override
- public void step(IFrameTupleReference tuple) throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- byte[] serBytes = inputVal.getByteArray();
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
- switch (typeTag) {
- case NULL: {
- metNull = true;
- break;
- }
- case SYSTEM_NULL: {
- // Ignore and return.
- return;
- }
- case RECORD: {
- // Expected.
- break;
- }
- default: {
- throw new AlgebricksException("Global-Avg is not defined for values of type "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
- }
- }
-
- // The record length helps us determine whether the input record fields are nullable.
- int recordLength = ARecordSerializerDeserializer.getRecordLength(serBytes, 1);
- int nullBitmapSize = 1;
- if (recordLength == 29) {
- nullBitmapSize = 0;
- }
- int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, nullBitmapSize,
- false);
- if (offset1 == 0) // the sum is null
- metNull = true;
- else
- globalSum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
- int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 1, nullBitmapSize,
- false);
- if (offset2 != 0) // the count is not null
- globalCount += AInt64SerializerDeserializer.getLong(serBytes, offset2);
-
- }
-
- @Override
- public void finish() throws AlgebricksException {
- try {
- if (globalCount == 0 || metNull)
- nullSerde.serialize(ANull.NULL, out);
- else {
- aDouble.setValue(globalSum / globalCount);
- doubleSerde.serialize(aDouble, out);
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void finishPartial() throws AlgebricksException {
- try {
- if (metNull || globalCount == 0) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(globalSum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt64.setValue(globalCount);
- longSerde.serialize(aInt64, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
- };
+ return new GlobalAvgAggregateFunction(args, provider);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java
new file mode 100644
index 0000000..a63d4bc
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class GlobalAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+ public GlobalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+ throws AlgebricksException {
+ super(args, output);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ processPartialResults(tuple);
+ }
+
+ @Override
+ public void finish() throws AlgebricksException {
+ finishFinalResults();
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finishPartialResults();
+ }
+
+ @Override
+ protected void processNull() {
+ aggType = ATypeTag.NULL;
+ }
+
+ @Override
+ protected boolean skipStep() {
+ return (aggType == ATypeTag.NULL);
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..0808e81
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class GlobalSqlAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new GlobalSqlAvgAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.GLOBAL_SQL_AVG;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new GlobalSqlAvgAggregateFunction(args, provider);
+ }
+ };
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..02c3c2c
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalSqlAvgAggregateFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class GlobalSqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+ public GlobalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+ throws AlgebricksException {
+ super(args, output);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ processPartialResults(tuple);
+ }
+
+ @Override
+ public void finish() throws AlgebricksException {
+ finishFinalResults();
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finishPartialResults();
+ }
+
+ @Override
+ protected void processNull() {
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
index 09a659c..fd98764 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
@@ -15,50 +15,16 @@
package edu.uci.ics.asterix.runtime.aggregates.std;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ADouble;
-import edu.uci.ics.asterix.om.base.AInt64;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
-import edu.uci.ics.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class LocalAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
@@ -77,157 +43,13 @@
@Override
public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
throws AlgebricksException {
-
return new ICopyAggregateFunctionFactory() {
private static final long serialVersionUID = 1L;
@Override
public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
throws AlgebricksException {
-
- List<IAType> unionList = new ArrayList<IAType>();
- unionList.add(BuiltinType.ANULL);
- unionList.add(BuiltinType.ADOUBLE);
- ARecordType tmpRecType;
- try {
- tmpRecType = new ARecordType(null, new String[] { "sum", "count" }, new IAType[] {
- new AUnionType(unionList, "OptionalDouble"), BuiltinType.AINT64 }, false);
- } catch (AsterixException e) {
- throw new AlgebricksException(e);
- }
-
- final ARecordType recType = tmpRecType;
-
- return new ICopyAggregateFunction() {
-
- private DataOutput out = provider.getDataOutput();
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
- private ATypeTag aggType;
- private double sum;
- private long count;
-
- private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
- private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
- private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
- private DataOutput countBytesOutput = new DataOutputStream(countBytes);
- private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
- private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
- private ClosedRecordConstructorEval recordEval = new ClosedRecordConstructorEval(recType,
- new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, out);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
-
- @Override
- public void init() {
- aggType = ATypeTag.SYSTEM_NULL;
- sum = 0.0;
- count = 0;
- }
-
- @Override
- public void step(IFrameTupleReference tuple) throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
- .deserialize(inputVal.getByteArray()[0]);
- if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
- aggType = ATypeTag.NULL;
- return;
- } else if (aggType == ATypeTag.SYSTEM_NULL) {
- aggType = typeTag;
- } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
- throw new AlgebricksException("Unexpected type " + typeTag
- + " in aggregation input stream. Expected type " + aggType + ".");
- } else if (ATypeHierarchy.canPromote(aggType, typeTag)) {
- aggType = typeTag;
- }
-
- if (typeTag != ATypeTag.SYSTEM_NULL) {
- ++count;
- }
-
- switch (typeTag) {
- case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute LOCAL-AVG for values of type "
- + typeTag);
- }
- }
- inputVal.reset();
- }
-
- @Override
- public void finish() throws AlgebricksException {
- try {
- if (count == 0 && aggType != ATypeTag.NULL) {
- out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- return;
- }
- if (aggType == ATypeTag.NULL) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(sum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt64.setValue(count);
- longSerde.serialize(aInt64, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void finishPartial() throws AlgebricksException {
- finish();
- }
- };
+ return new LocalAvgAggregateFunction(args, provider);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java
new file mode 100644
index 0000000..2c7abd4
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class LocalAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+ public LocalAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+ throws AlgebricksException {
+ super(args, output);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ processDataValues(tuple);
+ }
+
+ @Override
+ public void finish() throws AlgebricksException {
+ finishPartialResults();
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
+ }
+
+ @Override
+ protected void processNull() {
+ aggType = ATypeTag.NULL;
+ }
+
+ @Override
+ protected boolean skipStep() {
+ return (aggType == ATypeTag.NULL);
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
index 9e411b6..c540c43 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.std;
-import edu.uci.ics.asterix.common.functions.FunctionConstants;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..6a1f60e
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new LocalSqlAvgAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.LOCAL_SQL_AVG;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new LocalSqlAvgAggregateFunction(args, provider);
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java
new file mode 100644
index 0000000..6a97410
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlAvgAggregateFunction.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class LocalSqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+ public LocalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+ throws AlgebricksException {
+ super(args, output);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ processDataValues(tuple);
+ }
+
+ @Override
+ public void finish() throws AlgebricksException {
+ finishPartialResults();
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
+ }
+
+ @Override
+ protected void processNull() {
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMaxAggregateDescriptor.java
new file mode 100644
index 0000000..6b31244
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMaxAggregateDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlMaxAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final static FunctionIdentifier FID = AsterixBuiltinFunctions.LOCAL_SQL_MAX;
+
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new LocalSqlMaxAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new SqlMinMaxAggregateFunction(args, provider, false, true);
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMinAggregateDescriptor.java
new file mode 100644
index 0000000..4946703
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlMinAggregateDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlMinAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final static FunctionIdentifier FID = AsterixBuiltinFunctions.LOCAL_SQL_MIN;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new LocalSqlMinAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new SqlMinMaxAggregateFunction(args, provider, true, true);
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..1f47727
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSqlSumAggregateDescriptor.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSqlSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final static FunctionIdentifier FID = AsterixBuiltinFunctions.LOCAL_SQL_SUM;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new LocalSqlSumAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new SqlSumAggregateFunction(args, provider, true);
+ };
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
index 3e0b65c..0e35cd3 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.std;
-import edu.uci.ics.asterix.common.functions.FunctionConstants;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
index 8599913..0809701 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
@@ -14,164 +14,46 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.std;
-import java.io.DataOutput;
import java.io.IOException;
-import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
-import edu.uci.ics.asterix.om.types.hierachy.ITypePromoteComputer;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public class MinMaxAggregateFunction implements ICopyAggregateFunction {
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
- private ArrayBackedValueStorage tempValForCasting = new ArrayBackedValueStorage();
- private DataOutput out;
- private ICopyEvaluator eval;
- private ATypeTag aggType;
- private IBinaryComparator cmp;
- private ITypePromoteComputer tpc;
- private final boolean isMin;
+public class MinMaxAggregateFunction extends AbstractMinMaxAggregateFunction {
private final boolean isLocalAgg;
public MinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
boolean isLocalAgg) throws AlgebricksException {
- out = provider.getDataOutput();
- eval = args[0].createEvaluator(inputVal);
- this.isMin = isMin;
+ super(args, provider, isMin);
this.isLocalAgg = isLocalAgg;
}
- @Override
- public void init() {
- aggType = ATypeTag.SYSTEM_NULL;
- outputVal.reset();
- tempValForCasting.reset();
+ protected void processNull() {
+ aggType = ATypeTag.NULL;
}
@Override
- public void step(IFrameTupleReference tuple) throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
- if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
- aggType = ATypeTag.NULL;
- return;
+ protected boolean skipStep() {
+ return (aggType == ATypeTag.NULL);
+ }
+
+ @Override
+ protected void processSystemNull() throws AlgebricksException {
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
}
- if (aggType == ATypeTag.SYSTEM_NULL) {
- if (typeTag == ATypeTag.SYSTEM_NULL) {
- // Ignore.
- return;
- }
- // First value encountered. Set type, comparator, and initial value.
- aggType = typeTag;
- // Set comparator.
- IBinaryComparatorFactory cmpFactory = AqlBinaryComparatorFactoryProvider.INSTANCE
- .getBinaryComparatorFactory(aggType, isMin);
- cmp = cmpFactory.createBinaryComparator();
- // Initialize min value.
- outputVal.assign(inputVal);
- } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
- throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
- + aggType + ".");
+ }
+
+ @Override
+ protected void finishSystemNull() throws IOException {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
} else {
-
- // If a system_null is encountered locally, it would be an error; otherwise if it is seen
- // by a global aggregator, it is simple ignored.
- if (typeTag == ATypeTag.SYSTEM_NULL) {
- if (isLocalAgg) {
- throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
- } else {
- return;
- }
- }
-
- if (ATypeHierarchy.canPromote(aggType, typeTag)) {
- tpc = ATypeHierarchy.getTypePromoteComputer(aggType, typeTag);
- aggType = typeTag;
- cmp = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, isMin)
- .createBinaryComparator();
- if (tpc != null) {
- tempValForCasting.reset();
- try {
- tpc.promote(outputVal.getByteArray(), outputVal.getStartOffset() + 1,
- outputVal.getLength() - 1, tempValForCasting);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- outputVal.reset();
- outputVal.assign(tempValForCasting);
- }
- if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
- outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
- outputVal.assign(inputVal);
- }
-
- } else {
- tpc = ATypeHierarchy.getTypePromoteComputer(typeTag, aggType);
- if (tpc != null) {
- tempValForCasting.reset();
- try {
- tpc.promote(inputVal.getByteArray(), inputVal.getStartOffset() + 1, inputVal.getLength() - 1,
- tempValForCasting);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- if (cmp.compare(tempValForCasting.getByteArray(), tempValForCasting.getStartOffset(),
- tempValForCasting.getLength(), outputVal.getByteArray(), outputVal.getStartOffset(),
- outputVal.getLength()) < 0) {
- outputVal.assign(tempValForCasting);
- }
- } else {
- if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
- outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
- outputVal.assign(inputVal);
- }
- }
-
- }
+ out.writeByte(ATypeTag.NULL.serialize());
}
}
- @Override
- public void finish() throws AlgebricksException {
- try {
- switch (aggType) {
- case NULL: {
- out.writeByte(ATypeTag.NULL.serialize());
- break;
- }
- case SYSTEM_NULL: {
- // Empty stream. For local agg return system null. For global agg return null.
- if (isLocalAgg) {
- out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- } else {
- out.writeByte(ATypeTag.NULL.serialize());
- }
- break;
- }
- default: {
- out.write(outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength());
- break;
- }
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void finishPartial() throws AlgebricksException {
- finish();
- }
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateDescriptor.java
new file mode 100644
index 0000000..cf3f3dd
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlAvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new SqlAvgAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.SQL_AVG;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new SqlAvgAggregateFunction(args, provider);
+ }
+ };
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java
new file mode 100644
index 0000000..11cf13f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlAvgAggregateFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SqlAvgAggregateFunction extends AbstractAvgAggregateFunction {
+
+ public SqlAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
+ super(args, output);
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ processDataValues(tuple);
+ }
+
+ @Override
+ public void finish() throws AlgebricksException {
+ finishFinalResults();
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
+ }
+
+ @Override
+ protected void processNull() {
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateDescriptor.java
new file mode 100644
index 0000000..b484b53
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateDescriptor.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+/**
+ * NULLs are also counted.
+ */
+public class SqlCountAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new SqlCountAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.SQL_COUNT;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new SqlCountAggregateFunction(args, provider);
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateFunction.java
new file mode 100644
index 0000000..5fb5d72
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlCountAggregateFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+/**
+ * COUNT returns the number of non-null items in the given list. Note that COUNT(NULL) is not allowed.
+ */
+public class SqlCountAggregateFunction extends AbstractCountAggregateFunction {
+
+ public SqlCountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+ throws AlgebricksException {
+ super(args, output);
+ }
+
+ @Override
+ protected void processNull() {
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMaxAggregateDescriptor.java
new file mode 100644
index 0000000..c8e415f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMaxAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlMaxAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new SqlMaxAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.SQL_MAX;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new SqlMinMaxAggregateFunction(args, provider, false, false);
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinAggregateDescriptor.java
new file mode 100644
index 0000000..fe934f8
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlMinAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new SqlMinAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.SQL_MIN;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new SqlMinMaxAggregateFunction(args, provider, true, false);
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java
new file mode 100644
index 0000000..56236e3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlMinMaxAggregateFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.IOException;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlMinMaxAggregateFunction extends AbstractMinMaxAggregateFunction {
+ private final boolean isLocalAgg;
+
+ public SqlMinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
+ boolean isLocalAgg) throws AlgebricksException {
+ super(args, provider, isMin);
+ this.isLocalAgg = isLocalAgg;
+ }
+
+ @Override
+ protected void processNull() {
+ }
+
+ @Override
+ protected void processSystemNull() throws AlgebricksException {
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ }
+
+ @Override
+ protected void finishSystemNull() throws IOException {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ out.writeByte(ATypeTag.NULL.serialize());
+ }
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..294d9c1
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new SqlSumAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return AsterixBuiltinFunctions.SQL_SUM;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new SqlSumAggregateFunction(args, provider, false);
+ };
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java
new file mode 100644
index 0000000..4418288
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SqlSumAggregateFunction.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class SqlSumAggregateFunction extends AbstractSumAggregateFunction {
+ private final boolean isLocalAgg;
+
+ public SqlSumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
+ throws AlgebricksException {
+ super(args, provider);
+ this.isLocalAgg = isLocalAgg;
+ }
+
+ @Override
+ protected void processNull() {
+ }
+
+ @Override
+ protected void processSystemNull() throws AlgebricksException {
+ // For global aggregates simply ignore system null here,
+ // but if all input value are system null, then we should return
+ // null in finish().
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void finishSystemNull() throws IOException {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ }
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
index 31ad055..42b0346 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
@@ -14,202 +14,54 @@
*/
package edu.uci.ics.asterix.runtime.aggregates.std;
-import java.io.DataOutput;
import java.io.IOException;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
-import edu.uci.ics.asterix.om.types.hierachy.ATypeHierarchy;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public class SumAggregateFunction implements ICopyAggregateFunction {
- private DataOutput out;
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval;
- private double sum;
- private ATypeTag aggType;
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableFloat aFloat = new AMutableFloat(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
- private AMutableInt32 aInt32 = new AMutableInt32(0);
- private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
- private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
- @SuppressWarnings("rawtypes")
- private ISerializerDeserializer serde;
-
+public class SumAggregateFunction extends AbstractSumAggregateFunction {
private final boolean isLocalAgg;
public SumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
throws AlgebricksException {
- out = provider.getDataOutput();
- eval = args[0].createEvaluator(inputVal);
+ super(args, provider);
this.isLocalAgg = isLocalAgg;
}
@Override
- public void init() {
- aggType = ATypeTag.SYSTEM_NULL;
- sum = 0.0;
+ protected boolean skipStep() {
+ return (aggType == ATypeTag.NULL);
}
@Override
- public void step(IFrameTupleReference tuple) throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
- if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
- aggType = ATypeTag.NULL;
- return;
- } else if (aggType == ATypeTag.SYSTEM_NULL) {
- aggType = typeTag;
- } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
- throw new AlgebricksException("Unexpected type " + typeTag
- + " in aggregation input stream. Expected type (or a promotable type to)" + aggType + ".");
- }
+ protected void processNull() {
+ aggType = ATypeTag.NULL;
+ }
- if (ATypeHierarchy.canPromote(aggType, typeTag)) {
- aggType = typeTag;
- }
-
- switch (typeTag) {
- case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- break;
- }
- case SYSTEM_NULL: {
- // For global aggregates simply ignore system null here,
- // but if all input value are system null, then we should return
- // null in finish().
- if (isLocalAgg) {
- throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
- }
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
- }
+ @Override
+ protected void processSystemNull() throws AlgebricksException {
+ // For global aggregates simply ignore system null here,
+ // but if all input value are system null, then we should return
+ // null in finish().
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
}
}
@SuppressWarnings("unchecked")
@Override
- public void finish() throws AlgebricksException {
- try {
- switch (aggType) {
- case INT8: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
- aInt8.setValue((byte) sum);
- serde.serialize(aInt8, out);
- break;
- }
- case INT16: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
- aInt16.setValue((short) sum);
- serde.serialize(aInt16, out);
- break;
- }
- case INT32: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
- aInt32.setValue((int) sum);
- serde.serialize(aInt32, out);
- break;
- }
- case INT64: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
- aInt64.setValue((long) sum);
- serde.serialize(aInt64, out);
- break;
- }
- case FLOAT: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
- aFloat.setValue((float) sum);
- serde.serialize(aFloat, out);
- break;
- }
- case DOUBLE: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
- aDouble.setValue(sum);
- serde.serialize(aDouble, out);
- break;
- }
- case NULL: {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- break;
- }
- case SYSTEM_NULL: {
- // Empty stream. For local agg return system null. For global agg return null.
- if (isLocalAgg) {
- out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- } else {
- serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- }
- break;
- }
- default:
- throw new AlgebricksException("SumAggregationFunction: incompatible type for the result ("
- + aggType + "). ");
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
+ protected void finishSystemNull() throws IOException {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
}
}
-
- @Override
- public void finishPartial() throws AlgebricksException {
- finish();
- }
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index ea75c77..5ae2459 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -65,22 +65,43 @@
import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarCountAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarMaxAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlCountAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlMaxAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSqlSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableAvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableCountAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableGlobalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableGlobalSqlAvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSqlAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSqlSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSumAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSqlAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSqlCountAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSqlSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.AvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.CountAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.GlobalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.GlobalSqlAvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.LocalAvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.LocalMaxAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.LocalMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlMaxAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSqlSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.LocalSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.MaxAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.MinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlCountAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlMaxAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.SqlSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.SumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.stream.EmptyStreamAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.stream.NonEmptyStreamAggregateDescriptor;
@@ -426,6 +447,33 @@
temp.add(ScalarMaxAggregateDescriptor.FACTORY);
temp.add(ScalarMinAggregateDescriptor.FACTORY);
+ // SQL aggregates
+ temp.add(SqlCountAggregateDescriptor.FACTORY);
+ temp.add(SqlAvgAggregateDescriptor.FACTORY);
+ temp.add(LocalSqlAvgAggregateDescriptor.FACTORY);
+ temp.add(GlobalSqlAvgAggregateDescriptor.FACTORY);
+ temp.add(SqlSumAggregateDescriptor.FACTORY);
+ temp.add(LocalSqlSumAggregateDescriptor.FACTORY);
+ temp.add(SqlMaxAggregateDescriptor.FACTORY);
+ temp.add(LocalSqlMaxAggregateDescriptor.FACTORY);
+ temp.add(SqlMinAggregateDescriptor.FACTORY);
+ temp.add(LocalSqlMinAggregateDescriptor.FACTORY);
+
+ // SQL serializable aggregates
+ temp.add(SerializableSqlCountAggregateDescriptor.FACTORY);
+ temp.add(SerializableSqlAvgAggregateDescriptor.FACTORY);
+ temp.add(SerializableLocalSqlAvgAggregateDescriptor.FACTORY);
+ temp.add(SerializableGlobalSqlAvgAggregateDescriptor.FACTORY);
+ temp.add(SerializableSqlSumAggregateDescriptor.FACTORY);
+ temp.add(SerializableLocalSqlSumAggregateDescriptor.FACTORY);
+
+ // SQL scalar aggregates
+ temp.add(ScalarSqlCountAggregateDescriptor.FACTORY);
+ temp.add(ScalarSqlAvgAggregateDescriptor.FACTORY);
+ temp.add(ScalarSqlSumAggregateDescriptor.FACTORY);
+ temp.add(ScalarSqlMaxAggregateDescriptor.FACTORY);
+ temp.add(ScalarSqlMinAggregateDescriptor.FACTORY);
+
// new functions - constructors
temp.add(ABooleanConstructorDescriptor.FACTORY);
temp.add(ANullConstructorDescriptor.FACTORY);