Merged asterix_stabilization r529:r543.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_fix_agg@544 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index 76039ab..3b11b0a 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -35,6 +35,7 @@
import edu.uci.ics.asterix.optimizer.rules.LoadRecordFieldsRule;
import edu.uci.ics.asterix.optimizer.rules.NestGroupByRule;
import edu.uci.ics.asterix.optimizer.rules.PullPositionalVariableFromUnnestRule;
+import edu.uci.ics.asterix.optimizer.rules.PushAggFuncIntoStandaloneAggregateRule;
import edu.uci.ics.asterix.optimizer.rules.PushAggregateIntoGroupbyRule;
import edu.uci.ics.asterix.optimizer.rules.PushFieldAccessRule;
import edu.uci.ics.asterix.optimizer.rules.PushGroupByThroughProduct;
@@ -63,8 +64,8 @@
import edu.uci.ics.hyracks.algebricks.rewriter.rules.InsertProjectBeforeUnionRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroHashPartitionMergeExchange;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroJoinInsideSubplanRule;
-import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceCombinerRule;
-import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByForStandaloneAggregRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceAggregateCombinerRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByCombinerRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByForSubplanRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.IsolateHyracksOperatorsRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.PullSelectOutOfEqJoin;
@@ -94,7 +95,7 @@
public final static List<IAlgebraicRewriteRule> buildNormalizationRuleCollection() {
List<IAlgebraicRewriteRule> normalization = new LinkedList<IAlgebraicRewriteRule>();
normalization.add(new EliminateSubplanRule());
- normalization.add(new IntroduceGroupByForStandaloneAggregRule());
+ normalization.add(new PushAggFuncIntoStandaloneAggregateRule());
normalization.add(new BreakSelectIntoConjunctsRule());
normalization.add(new ExtractGbyExpressionsRule());
normalization.add(new ExtractDistinctByExpressionsRule());
@@ -168,7 +169,8 @@
consolidation.add(new ConsolidateSelectsRule());
consolidation.add(new ConsolidateAssignsRule());
consolidation.add(new InlineAssignIntoAggregateRule());
- consolidation.add(new IntroduceCombinerRule());
+ consolidation.add(new IntroduceGroupByCombinerRule());
+ consolidation.add(new IntroduceAggregateCombinerRule());
consolidation.add(new CountVarToCountOneRule());
consolidation.add(new IntroduceSelectAccessMethodRule());
consolidation.add(new IntroduceJoinAccessMethodRule());
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java
new file mode 100644
index 0000000..c5a1cb0
--- /dev/null
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Pushes aggregate functions into a stand alone aggregate operator (no group by).
+ */
+public class PushAggFuncIntoStandaloneAggregateRule implements IAlgebraicRewriteRule {
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ // Pattern to match: assign <-- aggregate <-- !(group-by)
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ if (op.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+ return false;
+ }
+ Mutable<ILogicalOperator> opRef2 = op.getInputs().get(0);
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
+ if (op2.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+ return false;
+ }
+ // If there's a group by below the agg, then we want to have the agg pushed into the group by.
+ Mutable<ILogicalOperator> opRef3 = op2.getInputs().get(0);
+ AbstractLogicalOperator op3 = (AbstractLogicalOperator) opRef3.getValue();
+ if (op3.getOperatorTag() == LogicalOperatorTag.GROUP) {
+ return false;
+ }
+
+ AssignOperator assignOp = (AssignOperator) op;
+ AggregateOperator aggOp = (AggregateOperator) op2;
+ if (aggOp.getVariables().size() != 1) {
+ return false;
+ }
+
+ // Make sure the agg expr is a listify.
+ ILogicalExpression aggExpr = aggOp.getExpressions().get(0).getValue();
+ if (aggExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ return false;
+ }
+ AbstractFunctionCallExpression origAggFuncExpr = (AbstractFunctionCallExpression) aggExpr;
+ if (origAggFuncExpr.getFunctionIdentifier() != AsterixBuiltinFunctions.LISTIFY) {
+ return false;
+ }
+
+ LogicalVariable aggVar = aggOp.getVariables().get(0);
+ List<LogicalVariable> used = new LinkedList<LogicalVariable>();
+ VariableUtilities.getUsedVariables(assignOp, used);
+ if (!used.contains(aggVar)) {
+ return false;
+ }
+
+ Mutable<ILogicalExpression> srcAssignExprRef = fingAggFuncExprRef(assignOp.getExpressions(), aggVar);
+ if (srcAssignExprRef == null) {
+ return false;
+ }
+ AbstractFunctionCallExpression assignFuncExpr = (AbstractFunctionCallExpression) srcAssignExprRef.getValue();
+ FunctionIdentifier aggFuncIdent = AsterixBuiltinFunctions.getAggregateFunction(assignFuncExpr.getFunctionIdentifier());
+
+ // Push the agg func into the agg op.
+ AbstractFunctionCallExpression aggOpExpr = (AbstractFunctionCallExpression) aggOp.getExpressions().get(0).getValue();
+ List<Mutable<ILogicalExpression>> aggArgs = new ArrayList<Mutable<ILogicalExpression>>();
+ aggArgs.add(aggOpExpr.getArguments().get(0));
+ AggregateFunctionCallExpression aggFuncExpr = AsterixBuiltinFunctions.makeAggregateFunctionExpression(aggFuncIdent, aggArgs);
+ aggOp.getExpressions().get(0).setValue(aggFuncExpr);
+
+ // The assign now just "renames" the variable to make sure the upstream plan still works.
+ srcAssignExprRef.setValue(new VariableReferenceExpression(aggVar));
+
+ // Create a new assign for a TRUE variable.
+ LogicalVariable trueVar = context.newVar();
+ AssignOperator trueAssignOp = new AssignOperator(trueVar, new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
+
+ ILogicalOperator aggInput = aggOp.getInputs().get(0).getValue();
+ aggOp.getInputs().get(0).setValue(trueAssignOp);
+ trueAssignOp.getInputs().add(new MutableObject<ILogicalOperator>(aggInput));
+
+ // Set partitioning variable.
+ aggOp.setPartitioningVariable(trueVar);
+
+ context.computeAndSetTypeEnvironmentForOperator(trueAssignOp);
+ context.computeAndSetTypeEnvironmentForOperator(aggOp);
+ context.computeAndSetTypeEnvironmentForOperator(assignOp);
+
+ return true;
+ }
+
+ private Mutable<ILogicalExpression> fingAggFuncExprRef(List<Mutable<ILogicalExpression>> exprRefs, LogicalVariable aggVar) {
+ for (Mutable<ILogicalExpression> exprRef : exprRefs) {
+ ILogicalExpression expr = exprRef.getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ continue;
+ }
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+ FunctionIdentifier funcIdent = AsterixBuiltinFunctions.getAggregateFunction(funcExpr.getFunctionIdentifier());
+ if (funcIdent == null) {
+ // Recursively look in func args.
+ return fingAggFuncExprRef(funcExpr.getArguments(), aggVar);
+ }
+ // Check if this is the expr that uses aggVar.
+ Collection<LogicalVariable> usedVars = new HashSet<LogicalVariable>();
+ funcExpr.getUsedVariables(usedVars);
+ if (usedVars.contains(aggVar)) {
+ return exprRef;
+ }
+ }
+ return null;
+ }
+}
diff --git a/asterix-app/src/test/resources/optimizerts/results/unnest-to-join_02.plan b/asterix-app/src/test/resources/optimizerts/results/unnest-to-join_02.plan
index bfb15c7..1f56c4f 100644
--- a/asterix-app/src/test/resources/optimizerts/results/unnest-to-join_02.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/unnest-to-join_02.plan
@@ -1,21 +1,11 @@
-- SINK_WRITE |UNPARTITIONED|
-- STREAM_PROJECT |UNPARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$8] |UNPARTITIONED|
- {
- -- AGGREGATE |UNPARTITIONED|
- -- NESTED_TUPLE_SOURCE |UNPARTITIONED|
- }
- -- ONE_TO_ONE_EXCHANGE |LOCAL|
- -- STABLE_SORT [$$8(ASC)] |LOCAL|
- -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
- -- STREAM_PROJECT |UNPARTITIONED|
- -- ASSIGN |UNPARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
- -- HYBRID_HASH_JOIN [$$0][$$1] |UNPARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
- -- UNNEST |UNPARTITIONED|
- -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
- -- UNNEST |UNPARTITIONED|
- -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
+ -- AGGREGATE |UNPARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ -- HYBRID_HASH_JOIN [$$0][$$1] |UNPARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ -- UNNEST |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ -- UNNEST |UNPARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
diff --git a/asterix-app/src/test/resources/runtimets/ignore.txt b/asterix-app/src/test/resources/runtimets/ignore.txt
index 1cf8519..d567c45 100644
--- a/asterix-app/src/test/resources/runtimets/ignore.txt
+++ b/asterix-app/src/test/resources/runtimets/ignore.txt
@@ -10,7 +10,6 @@
fuzzyjoin/dblp-csx-dblp-aqlplus_1.aql
fuzzyjoin/user-vis-int-vis-user-lot-aqlplus_1.aql
subset-collection/04.aql
-quantifiers/everysat_01.aql
custord/freq-clerk.aql
custord/denorm-cust-order_01.aql
custord/denorm-cust-order_03.aql
@@ -36,4 +35,4 @@
quantifiers/somesat_04.aql
quantifiers/somesat_05.aql
quantifiers/everysat_02.aql
-quantifiers/everysat_03.aql
+quantifiers/everysat_03.aql
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_01.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_01.aql
new file mode 100644
index 0000000..ab2a6fb
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_01.aql
@@ -0,0 +1,17 @@
+/*
+ * Description : Tests that avg aggregation correctly returns null for an empty stream,
+ * without an aggregate combiner.
+ * Success : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_avg_empty_01.adm";
+
+avg(
+ for $x in [1, 2, 3]
+ where $x > 10
+ return $x
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_02.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_02.aql
new file mode 100644
index 0000000..3583ce0
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_02.aql
@@ -0,0 +1,23 @@
+/*
+ * Description : Tests that avg aggregation correctly returns null for an empty stream,
+ * with an aggregate combiner.
+ * Success : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TestType as closed {
+ id: int32,
+ val: double
+}
+
+create dataset Test(TestType) partitioned by key id;
+
+write output to nc1:"rttest/aggregate_avg_empty_02.adm";
+
+avg(
+ for $x in dataset('Test')
+ return $x.val
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_01.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_01.aql
new file mode 100644
index 0000000..f03c252
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_01.aql
@@ -0,0 +1,17 @@
+/*
+ * Description : Tests that count aggregation correctly returns 0 for an empty stream,
+ * without an aggregate combiner.
+ * Success : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_count_empty_01.adm";
+
+count(
+ for $x in [1, 2, 3]
+ where $x > 10
+ return $x
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_02.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_02.aql
new file mode 100644
index 0000000..0a6cc8e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_02.aql
@@ -0,0 +1,23 @@
+/*
+ * Description : Tests that count aggregation correctly returns 0 for an empty stream,
+ * with an aggregate combiner.
+ * Success : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TestType as closed {
+ id: int32,
+ val: double
+}
+
+create dataset Test(TestType) partitioned by key id;
+
+write output to nc1:"rttest/aggregate_count_empty_02.adm";
+
+count(
+ for $x in dataset('Test')
+ return $x.val
+)
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_01.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_01.aql
new file mode 100644
index 0000000..2464b64
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_01.aql
@@ -0,0 +1,17 @@
+/*
+ * Description : Tests that max aggregation correctly returns null for an empty stream,
+ * without an aggregate combiner.
+ * Success : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_max_empty_01.adm";
+
+max(
+ for $x in [1, 2, 3]
+ where $x > 10
+ return $x
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_02.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_02.aql
new file mode 100644
index 0000000..79ae1d8
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_02.aql
@@ -0,0 +1,23 @@
+/*
+ * Description : Tests that max aggregation correctly returns null for an empty stream,
+ * with an aggregate combiner.
+ * Success : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TestType as closed {
+ id: int32,
+ val: double
+}
+
+create dataset Test(TestType) partitioned by key id;
+
+write output to nc1:"rttest/aggregate_max_empty_02.adm";
+
+max(
+ for $x in dataset('Test')
+ return $x.val
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_01.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_01.aql
new file mode 100644
index 0000000..30abd1e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_01.aql
@@ -0,0 +1,17 @@
+/*
+ * Description : Tests that min aggregation correctly returns null for an empty stream,
+ * without an aggregate combiner.
+ * Success : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_min_empty_01.adm";
+
+min(
+ for $x in [1, 2, 3]
+ where $x > 10
+ return $x
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_02.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_02.aql
new file mode 100644
index 0000000..99d49f4
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_02.aql
@@ -0,0 +1,23 @@
+/*
+ * Description : Tests that min aggregation correctly returns null for an empty stream,
+ * with an aggregate combiner.
+ * Success : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TestType as closed {
+ id: int32,
+ val: double
+}
+
+create dataset Test(TestType) partitioned by key id;
+
+write output to nc1:"rttest/aggregate_min_empty_02.adm";
+
+min(
+ for $x in dataset('Test')
+ return $x.val
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_01.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_01.aql
new file mode 100644
index 0000000..b4e26b8
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_01.aql
@@ -0,0 +1,17 @@
+/*
+ * Description : Tests that sum aggregation correctly returns null for an empty stream,
+ * without an aggregate combiner.
+ * Success : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_sum_empty_01.adm";
+
+sum(
+ for $x in [1, 2, 3]
+ where $x > 10
+ return $x
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_02.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_02.aql
new file mode 100644
index 0000000..a94457a
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_02.aql
@@ -0,0 +1,23 @@
+/*
+ * Description : Tests that sum aggregation correctly returns null for an empty stream,
+ * with an aggregate combiner.
+ * Success : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TestType as closed {
+ id: int32,
+ val: double
+}
+
+create dataset Test(TestType) partitioned by key id;
+
+write output to nc1:"rttest/aggregate_sum_empty_02.adm";
+
+sum(
+ for $x in dataset('Test')
+ return $x.val
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/quantifiers/everysat_04.aql b/asterix-app/src/test/resources/runtimets/queries/quantifiers/everysat_04.aql
new file mode 100644
index 0000000..b9eccfd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/quantifiers/everysat_04.aql
@@ -0,0 +1,23 @@
+/*
+ * Description : Tests that universal quantification returns true/false correctly.
+ * Success : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/quantifiers_everysat_04.adm";
+
+let $x := [
+every $x in [false,false] satisfies $x,
+every $x in [true,false] satisfies $x,
+every $x in [false,true] satisfies $x,
+every $x in [true,true] satisfies $x,
+every $x in [false,false] satisfies not($x),
+every $x in [true,false] satisfies not($x),
+every $x in [false,true] satisfies not($x),
+every $x in [true,true] satisfies not($x)
+]
+for $i in $x
+return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/quantifiers/somesat_06.aql b/asterix-app/src/test/resources/runtimets/queries/quantifiers/somesat_06.aql
new file mode 100644
index 0000000..6e8892c
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/quantifiers/somesat_06.aql
@@ -0,0 +1,23 @@
+/*
+ * Description : Tests that existential quantification returns true/false correctly.
+ * Success : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/quantifiers_somesat_06.adm";
+
+let $x := [
+some $x in [false,false] satisfies $x,
+some $x in [true,false] satisfies $x,
+some $x in [false,true] satisfies $x,
+some $x in [true,true] satisfies $x,
+some $x in [false,false] satisfies not($x),
+some $x in [true,false] satisfies not($x),
+some $x in [false,true] satisfies not($x),
+some $x in [true,true] satisfies not($x)
+]
+for $i in $x
+return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_01.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_01.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_01.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_02.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_02.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_02.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_01.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_01.adm
new file mode 100644
index 0000000..573541a
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_01.adm
@@ -0,0 +1 @@
+0
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_02.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_02.adm
new file mode 100644
index 0000000..573541a
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_02.adm
@@ -0,0 +1 @@
+0
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_01.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_01.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_01.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_02.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_02.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_02.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_01.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_01.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_01.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_02.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_02.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_02.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_01.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_01.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_01.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_02.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_02.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_02.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_01.adm b/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_01.adm
new file mode 100644
index 0000000..d6cf966
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_01.adm
@@ -0,0 +1 @@
+-30
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_02.adm b/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_02.adm
new file mode 100644
index 0000000..fd6a9d9
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_02.adm
@@ -0,0 +1,12 @@
+false
+true
+false
+true
+false
+false
+true
+false
+false
+false
+false
+false
diff --git a/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_04.adm b/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_04.adm
new file mode 100644
index 0000000..98ce28d
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_04.adm
@@ -0,0 +1,8 @@
+false
+false
+false
+true
+true
+false
+false
+false
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/quantifiers/somesat_06.adm b/asterix-app/src/test/resources/runtimets/results/quantifiers/somesat_06.adm
new file mode 100644
index 0000000..2cc1766
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/quantifiers/somesat_06.adm
@@ -0,0 +1,8 @@
+false
+true
+true
+true
+true
+true
+true
+false
\ No newline at end of file
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index 417948f..2af5d9d 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -234,8 +234,11 @@
public final static FunctionIdentifier AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-avg", 1);
public final static FunctionIdentifier COUNT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-count", 1);
public final static FunctionIdentifier SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-sum", 1);
+ public final static FunctionIdentifier LOCAL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sum", 1);
public final static FunctionIdentifier MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-max", 1);
+ public final static FunctionIdentifier LOCAL_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-max", 1);
public final static FunctionIdentifier MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-min", 1);
+ public final static FunctionIdentifier LOCAL_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-min", 1);
public final static FunctionIdentifier GLOBAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"agg-global-avg", 1);
public final static FunctionIdentifier LOCAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -259,6 +262,8 @@
"count-serial", 1);
public final static FunctionIdentifier SERIAL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"sum-serial", 1);
+ public final static FunctionIdentifier SERIAL_LOCAL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "local-sum-serial", 1);
public final static FunctionIdentifier SERIAL_GLOBAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"global-avg-serial", 1);
public final static FunctionIdentifier SERIAL_LOCAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -514,7 +519,9 @@
add(MAKE_FIELD_INDEX_HANDLE, null); // TODO
add(MAKE_FIELD_NAME_HANDLE, null); // TODO
add(MAX, NonTaggedSumTypeComputer.INSTANCE);
+ add(LOCAL_MAX, NonTaggedSumTypeComputer.INSTANCE);
add(MIN, NonTaggedSumTypeComputer.INSTANCE);
+ add(LOCAL_MIN, NonTaggedSumTypeComputer.INSTANCE);
add(NON_EMPTY_STREAM, ABooleanTypeComputer.INSTANCE);
add(NULL_CONSTRUCTOR, ANullTypeComputer.INSTANCE);
add(NUMERIC_UNARY_MINUS, NonTaggedUnaryMinusTypeComputer.INSTANCE);
@@ -571,6 +578,7 @@
add(SERIAL_GLOBAL_AVG, OptionalADoubleTypeComputer.INSTANCE);
add(SERIAL_LOCAL_AVG, NonTaggedLocalAvgTypeComputer.INSTANCE);
add(SERIAL_SUM, NonTaggedSumTypeComputer.INSTANCE);
+ add(SERIAL_LOCAL_SUM, NonTaggedSumTypeComputer.INSTANCE);
add(SIMILARITY_JACCARD, AFloatTypeComputer.INSTANCE);
add(SIMILARITY_JACCARD_CHECK, OrderedListOfAnyTypeComputer.INSTANCE);
add(SIMILARITY_JACCARD_SORTED, AFloatTypeComputer.INSTANCE);
@@ -622,6 +630,7 @@
});
add(SUBSTRING, SubstringTypeComputer.INSTANCE);
add(SUM, NonTaggedSumTypeComputer.INSTANCE);
+ add(LOCAL_SUM, NonTaggedSumTypeComputer.INSTANCE);
add(SWITCH_CASE, NonTaggedSwitchCaseComputer.INSTANCE);
add(REG_EXP, ABooleanTypeComputer.INSTANCE);
add(INJECT_FAILURE, InjectFailureTypeComputer.INSTANCE);
@@ -671,15 +680,17 @@
addGlobalAgg(COUNT, SUM);
addAgg(MAX);
- addLocalAgg(MAX, MAX);
+ addAgg(LOCAL_MAX);
+ addLocalAgg(MAX, LOCAL_MAX);
addGlobalAgg(MAX, MAX);
addAgg(MIN);
- addLocalAgg(MIN, MIN);
+ addLocalAgg(MIN, LOCAL_MIN);
addGlobalAgg(MIN, MIN);
addAgg(SUM);
- addLocalAgg(SUM, SUM);
+ addAgg(LOCAL_SUM);
+ addLocalAgg(SUM, LOCAL_SUM);
addGlobalAgg(SUM, SUM);
addAgg(LISTIFY);
@@ -688,6 +699,7 @@
addSerialAgg(AVG, SERIAL_AVG);
addSerialAgg(COUNT, SERIAL_COUNT);
addSerialAgg(SUM, SERIAL_SUM);
+ addSerialAgg(LOCAL_SUM, SERIAL_LOCAL_SUM);
addSerialAgg(LOCAL_AVG, SERIAL_LOCAL_AVG);
addSerialAgg(GLOBAL_AVG, SERIAL_GLOBAL_AVG);
@@ -702,7 +714,8 @@
addGlobalAgg(SERIAL_AVG, SERIAL_GLOBAL_AVG);
addAgg(SERIAL_SUM);
- addLocalAgg(SERIAL_SUM, SERIAL_SUM);
+ addAgg(SERIAL_LOCAL_SUM);
+ addLocalAgg(SERIAL_SUM, SERIAL_LOCAL_SUM);
addGlobalAgg(SERIAL_SUM, SERIAL_SUM);
}
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
index aac5447..b75c074 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
@@ -23,7 +23,7 @@
FLOAT(11),
DOUBLE(12),
STRING(13),
- NULL(14),
+ NULL(14),
BOOLEAN(15),
DATETIME(16),
DATE(17),
@@ -42,7 +42,8 @@
LINE(30),
POLYGON(31),
CIRCLE(32),
- RECTANGLE(33);
+ RECTANGLE(33),
+ SYSTEM_NULL(34);
private byte value;
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
index e4c4931..06fc5fd 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
@@ -6,7 +6,6 @@
import java.util.ArrayList;
import java.util.List;
-import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.functions.FunctionConstants;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
@@ -44,8 +43,6 @@
private static final long serialVersionUID = 1L;
public final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"global-avg-serial", 1);
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
- private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
public IFunctionDescriptor createFunctionDescriptor() {
return new SerializableGlobalAvgAggregateDescriptor();
@@ -119,12 +116,25 @@
inputVal.reset();
eval.evaluate(tuple);
byte[] serBytes = inputVal.getByteArray();
- if (serBytes[0] == SER_NULL_TYPE_TAG)
- metNull = true;
- if (serBytes[0] != SER_RECORD_TYPE_TAG) {
- throw new AlgebricksException("Global-Avg is not defined for values of type "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
- }
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+ switch (typeTag) {
+ case NULL: {
+ metNull = true;
+ break;
+ }
+ case SYSTEM_NULL: {
+ // Ignore and return.
+ return;
+ }
+ case RECORD: {
+ // Expected.
+ break;
+ }
+ default: {
+ throw new AlgebricksException("Global-Avg is not defined for values of type "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+ }
+ }
int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, 1, true);
if (offset1 == 0) // the sum is null
metNull = true;
@@ -146,19 +156,15 @@
long globalCount = BufferSerDeUtil.getLong(state, start + 8);
boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
- if (globalCount == 0) {
- GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
- } else {
- try {
- if (metNull)
- nullSerde.serialize(ANull.NULL, result);
- else {
- aDouble.setValue(globalSum / globalCount);
- doubleSerde.serialize(aDouble, result);
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
+ try {
+ if (globalCount == 0 || metNull)
+ nullSerde.serialize(ANull.NULL, result);
+ else {
+ aDouble.setValue(globalSum / globalCount);
+ doubleSerde.serialize(aDouble, result);
}
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
}
}
@@ -173,25 +179,21 @@
recordEval = new ClosedRecordConstructorEval(recType,
new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, result);
- if (globalCount == 0) {
- GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
- } else {
- try {
- if (metNull) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(globalSum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt64.setValue(globalCount);
- longSerde.serialize(aInt64, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
+ try {
+ if (globalCount == 0 || metNull) {
+ sumBytes.reset();
+ nullSerde.serialize(ANull.NULL, sumBytesOutput);
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(globalSum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
}
+ countBytes.reset();
+ aInt64.setValue(globalCount);
+ longSerde.serialize(aInt64, countBytesOutput);
+ recordEval.evaluate(null);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
}
}
};
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
index f5c9538..5cef25c 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
@@ -6,7 +6,6 @@
import java.util.ArrayList;
import java.util.List;
-import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.functions.FunctionConstants;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
@@ -177,27 +176,25 @@
if (recordEval == null)
recordEval = new ClosedRecordConstructorEval(recType,
new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, result);
- if (count == 0) {
- if (GlobalConfig.DEBUG) {
- GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+ try {
+ if (count == 0) {
+ result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ return;
}
- } else {
- try {
- if (metNull) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(sum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt64.setValue(count);
- int64Serde.serialize(aInt64, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
+ if (metNull) {
+ sumBytes.reset();
+ nullSerde.serialize(ANull.NULL, sumBytesOutput);
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(sum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
}
+ countBytes.reset();
+ aInt64.setValue(count);
+ int64Serde.serialize(aInt64, countBytesOutput);
+ recordEval.evaluate(null);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
}
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
new file mode 100644
index 0000000..d080b27
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableLocalSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "local-sum-serial", 1);
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new SerializableLocalSumAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID;
+ }
+
+ @Override
+ public ICopySerializableAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopySerializableAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+ return new SerializableSumAggregateFunction(args, true);
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
index e689ab4..b97e35b 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
@@ -1,40 +1,14 @@
package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
@@ -55,182 +29,12 @@
public ICopySerializableAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
throws AlgebricksException {
return new ICopySerializableAggregateFunctionFactory() {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
@Override
public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
- return new ICopySerializableAggregateFunction() {
-
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableFloat aFloat = new AMutableFloat(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
- private AMutableInt32 aInt32 = new AMutableInt32(0);
- private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
- private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
- @SuppressWarnings("rawtypes")
- private ISerializerDeserializer serde;
-
- @Override
- public void init(DataOutput state) throws AlgebricksException {
- try {
- state.writeBoolean(false);
- state.writeBoolean(false);
- state.writeBoolean(false);
- state.writeBoolean(false);
- state.writeBoolean(false);
- state.writeBoolean(false);
- state.writeBoolean(false);
- state.writeDouble(0.0);
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
-
- @Override
- public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
- throws AlgebricksException {
- int pos = start;
- boolean metInt8s = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metInt16s = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metInt32s = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metInt64s = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
- double sum = BufferSerDeUtil.getDouble(state, pos);
-
- inputVal.reset();
- eval.evaluate(tuple);
- if (inputVal.getLength() > 0) {
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
- .getByteArray()[0]);
- switch (typeTag) {
- case INT8: {
- metInt8s = true;
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- metInt16s = true;
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- metInt32s = true;
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- metInt64s = true;
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- metFloats = true;
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- metDoubles = true;
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- metNull = true;
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute SUM for values of type "
- + typeTag);
- }
- }
- }
-
- pos = start;
- BufferSerDeUtil.writeBoolean(metInt8s, state, pos++);
- BufferSerDeUtil.writeBoolean(metInt16s, state, pos++);
- BufferSerDeUtil.writeBoolean(metInt32s, state, pos++);
- BufferSerDeUtil.writeBoolean(metInt64s, state, pos++);
- BufferSerDeUtil.writeBoolean(metFloats, state, pos++);
- BufferSerDeUtil.writeBoolean(metDoubles, state, pos++);
- BufferSerDeUtil.writeBoolean(metNull, state, pos++);
- BufferSerDeUtil.writeDouble(sum, state, pos);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
- int pos = start;
- boolean metInt8s = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metInt16s = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metInt32s = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metInt64s = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
- boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
- double sum = BufferSerDeUtil.getDouble(state, pos);
- try {
- if (metNull) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- } else if (metDoubles) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- aDouble.setValue(sum);
- serde.serialize(aDouble, out);
- } else if (metFloats) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AFLOAT);
- aFloat.setValue((float) sum);
- serde.serialize(aFloat, out);
- } else if (metInt64s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- aInt64.setValue((long) sum);
- serde.serialize(aInt64, out);
- } else if (metInt32s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT32);
- aInt32.setValue((int) sum);
- serde.serialize(aInt32, out);
- } else if (metInt16s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT16);
- aInt16.setValue((short) sum);
- serde.serialize(aInt16, out);
- } else if (metInt8s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT8);
- aInt8.setValue((byte) sum);
- serde.serialize(aInt8, out);
- } else {
- GlobalConfig.ASTERIX_LOGGER.fine("SUM aggregate ran over empty input.");
- }
-
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
-
- }
-
- @Override
- public void finishPartial(byte[] state, int start, int len, DataOutput out)
- throws AlgebricksException {
- finish(state, start, len, out);
- }
- };
+ return new SerializableSumAggregateFunction(args, false);
}
};
}
-
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
new file mode 100644
index 0000000..ef5f6d1
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -0,0 +1,219 @@
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableSumAggregateFunction implements ICopySerializableAggregateFunction {
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private ICopyEvaluator eval;
+ private AMutableDouble aDouble = new AMutableDouble(0);
+ private AMutableFloat aFloat = new AMutableFloat(0);
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private AMutableInt32 aInt32 = new AMutableInt32(0);
+ private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+ private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+ @SuppressWarnings("rawtypes")
+ private ISerializerDeserializer serde;
+ private final boolean isLocalAgg = false;
+
+ public SerializableSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
+ throws AlgebricksException {
+ eval = args[0].createEvaluator(inputVal);
+ }
+
+ @Override
+ public void init(DataOutput state) throws AlgebricksException {
+ try {
+ state.writeBoolean(false);
+ state.writeBoolean(false);
+ state.writeBoolean(false);
+ state.writeBoolean(false);
+ state.writeBoolean(false);
+ state.writeBoolean(false);
+ state.writeBoolean(false);
+ state.writeDouble(0.0);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
+ throws AlgebricksException {
+ int pos = start;
+ boolean metInt8s = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metInt16s = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metInt32s = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metInt64s = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
+ double sum = BufferSerDeUtil.getDouble(state, pos);
+
+ inputVal.reset();
+ eval.evaluate(tuple);
+ if (inputVal.getLength() > 0) {
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
+ .getByteArray()[0]);
+ switch (typeTag) {
+ case INT8: {
+ metInt8s = true;
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT16: {
+ metInt16s = true;
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ metInt32s = true;
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ metInt64s = true;
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ metFloats = true;
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ metDoubles = true;
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case NULL: {
+ metNull = true;
+ break;
+ }
+ case SYSTEM_NULL: {
+ // For global aggregates simply ignore system null here,
+ // but if all input value are system null, then we should return
+ // null in finish().
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute SUM for values of type "
+ + typeTag);
+ }
+ }
+ }
+
+ pos = start;
+ BufferSerDeUtil.writeBoolean(metInt8s, state, pos++);
+ BufferSerDeUtil.writeBoolean(metInt16s, state, pos++);
+ BufferSerDeUtil.writeBoolean(metInt32s, state, pos++);
+ BufferSerDeUtil.writeBoolean(metInt64s, state, pos++);
+ BufferSerDeUtil.writeBoolean(metFloats, state, pos++);
+ BufferSerDeUtil.writeBoolean(metDoubles, state, pos++);
+ BufferSerDeUtil.writeBoolean(metNull, state, pos++);
+ BufferSerDeUtil.writeDouble(sum, state, pos);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+ int pos = start;
+ boolean metInt8s = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metInt16s = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metInt32s = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metInt64s = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
+ boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
+ double sum = BufferSerDeUtil.getDouble(state, pos);
+ try {
+ if (metNull) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ } else if (metDoubles) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ADOUBLE);
+ aDouble.setValue(sum);
+ serde.serialize(aDouble, out);
+ } else if (metFloats) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AFLOAT);
+ aFloat.setValue((float) sum);
+ serde.serialize(aFloat, out);
+ } else if (metInt64s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+ aInt64.setValue((long) sum);
+ serde.serialize(aInt64, out);
+ } else if (metInt32s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT32);
+ aInt32.setValue((int) sum);
+ serde.serialize(aInt32, out);
+ } else if (metInt16s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT16);
+ aInt16.setValue((short) sum);
+ serde.serialize(aInt16, out);
+ } else if (metInt8s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT8);
+ aInt8.setValue((byte) sum);
+ serde.serialize(aInt8, out);
+ } else {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ }
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+
+ }
+
+ @Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput out)
+ throws AlgebricksException {
+ finish(state, start, len, out);
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
index ecdd194..4b43a0b 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
@@ -39,6 +39,7 @@
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ByteArrayAccessibleOutputStream;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
@@ -168,6 +169,11 @@
public void finish() throws AlgebricksException {
if (count == 0) {
GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
+ try {
+ nullSerde.serialize(ANull.NULL, out);
+ } catch (HyracksDataException e) {
+ throw new AlgebricksException(e);
+ }
} else {
try {
if (metNull)
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
index e86b2bc..1e4516e 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
@@ -6,7 +6,6 @@
import java.util.ArrayList;
import java.util.List;
-import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.functions.FunctionConstants;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
@@ -45,8 +44,6 @@
private static final long serialVersionUID = 1L;
public final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-global-avg",
1);
- private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
- private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
public IFunctionDescriptor createFunctionDescriptor() {
return new GlobalAvgAggregateDescriptor();
@@ -117,11 +114,24 @@
inputVal.reset();
eval.evaluate(tuple);
byte[] serBytes = inputVal.getByteArray();
- if (serBytes[0] == SER_NULL_TYPE_TAG)
- metNull = true;
- if (serBytes[0] != SER_RECORD_TYPE_TAG) {
- throw new AlgebricksException("Global-Avg is not defined for values of type "
- + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+ switch (typeTag) {
+ case NULL: {
+ metNull = true;
+ break;
+ }
+ case SYSTEM_NULL: {
+ // Ignore and return.
+ return;
+ }
+ case RECORD: {
+ // Expected.
+ break;
+ }
+ default: {
+ throw new AlgebricksException("Global-Avg is not defined for values of type "
+ + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+ }
}
int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, 1, false);
if (offset1 == 0) // the sum is null
@@ -136,43 +146,35 @@
@Override
public void finish() throws AlgebricksException {
- if (globalCount == 0) {
- GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
- } else {
- try {
- if (metNull)
- nullSerde.serialize(ANull.NULL, out);
- else {
- aDouble.setValue(globalSum / globalCount);
- doubleSerde.serialize(aDouble, out);
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
+ try {
+ if (globalCount == 0 || metNull)
+ nullSerde.serialize(ANull.NULL, out);
+ else {
+ aDouble.setValue(globalSum / globalCount);
+ doubleSerde.serialize(aDouble, out);
}
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
}
}
@Override
public void finishPartial() throws AlgebricksException {
- if (globalCount == 0) {
- GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
- } else {
- try {
- if (metNull) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(globalSum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt32.setValue(globalCount);
- intSerde.serialize(aInt32, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
+ try {
+ if (metNull || globalCount == 0) {
+ sumBytes.reset();
+ nullSerde.serialize(ANull.NULL, sumBytesOutput);
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(globalSum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
}
+ countBytes.reset();
+ aInt32.setValue(globalCount);
+ intSerde.serialize(aInt32, countBytesOutput);
+ recordEval.evaluate(null);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
}
}
};
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
index fa05d4b..2ef543f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
@@ -6,7 +6,6 @@
import java.util.ArrayList;
import java.util.List;
-import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.functions.FunctionConstants;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
@@ -168,27 +167,25 @@
@Override
public void finish() throws AlgebricksException {
- if (count == 0) {
- if (GlobalConfig.DEBUG) {
- GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+ try {
+ if (count == 0) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ return;
}
- } else {
- try {
- if (metNull) {
- sumBytes.reset();
- nullSerde.serialize(ANull.NULL, sumBytesOutput);
- } else {
- sumBytes.reset();
- aDouble.setValue(sum);
- doubleSerde.serialize(aDouble, sumBytesOutput);
- }
- countBytes.reset();
- aInt32.setValue(count);
- int32Serde.serialize(aInt32, countBytesOutput);
- recordEval.evaluate(null);
- } catch (IOException e) {
- throw new AlgebricksException(e);
+ if (metNull) {
+ sumBytes.reset();
+ nullSerde.serialize(ANull.NULL, sumBytesOutput);
+ } else {
+ sumBytes.reset();
+ aDouble.setValue(sum);
+ doubleSerde.serialize(aDouble, sumBytesOutput);
}
+ countBytes.reset();
+ aInt32.setValue(count);
+ int32Serde.serialize(aInt32, countBytesOutput);
+ recordEval.evaluate(null);
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
}
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java
new file mode 100644
index 0000000..11d83b6
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+
+public class LocalMaxAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-max",
+ 1);
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new LocalMaxAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new MaxAggregateFunction(args, provider, true);
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
new file mode 100644
index 0000000..2fc219f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+
+public class LocalMinAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-min",
+ 1);
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new LocalMinAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new MinAggregateFunction(args, provider, true);
+ }
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
new file mode 100644
index 0000000..b2f3e57
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+
+public class LocalSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sum",
+ 1);
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new LocalSumAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID;
+ }
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+ return new SumAggregateFunction(args, provider, true);
+ };
+ };
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
index e409910..a672b19 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
@@ -1,39 +1,15 @@
package edu.uci.ics.asterix.runtime.aggregates.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class MaxAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
@@ -50,7 +26,6 @@
return FID;
}
- @SuppressWarnings("unchecked")
@Override
public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
throws AlgebricksException {
@@ -60,152 +35,7 @@
@Override
public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
throws AlgebricksException {
-
- return new ICopyAggregateFunction() {
-
- private DataOutput out = provider.getDataOutput();
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
- private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
-
- private short shortVal = Short.MIN_VALUE;
- private int intVal = Integer.MIN_VALUE;
- private long longVal = Long.MIN_VALUE;
- private float floatVal = Float.MIN_VALUE;
- private double doubleVal = Double.MIN_VALUE;
-
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableFloat aFloat = new AMutableFloat(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
- private AMutableInt32 aInt32 = new AMutableInt32(0);
- private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
- @SuppressWarnings("rawtypes")
- private ISerializerDeserializer serde;
-
- @Override
- public void init() {
- shortVal = Short.MIN_VALUE;
- intVal = Integer.MIN_VALUE;
- longVal = Long.MIN_VALUE;
- floatVal = Float.MIN_VALUE;
- doubleVal = Double.MIN_VALUE;
-
- metInt8s = false;
- metInt16s = false;
- metInt32s = false;
- metInt64s = false;
- metFloats = false;
- metDoubles = false;
- metNull = false;
- }
-
- @Override
- public void step(IFrameTupleReference tuple) throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- if (inputVal.getLength() > 0) {
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
- .getByteArray()[0]);
- switch (typeTag) {
- case INT8: {
- metInt8s = true;
- throw new NotImplementedException("no implementation for int8's comparator");
- }
- case INT16: {
- metInt16s = true;
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- if (val > shortVal)
- shortVal = val;
- throw new NotImplementedException("no implementation for int16's comparator");
- }
- case INT32: {
- metInt32s = true;
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- if (val > intVal)
- intVal = val;
- break;
- }
- case INT64: {
- metInt64s = true;
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- if (val > longVal)
- longVal = val;
- break;
- }
- case FLOAT: {
- metFloats = true;
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- if (val > floatVal)
- floatVal = val;
- break;
- }
- case DOUBLE: {
- metDoubles = true;
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- if (val > doubleVal)
- doubleVal = val;
- break;
- }
- case NULL: {
- metNull = true;
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute SUM for values of type "
- + typeTag);
- }
- }
- }
- }
-
- @Override
- public void finish() throws AlgebricksException {
- try {
- if (metNull) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- } else if (metDoubles) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- aDouble.setValue(doubleVal);
- serde.serialize(aDouble, out);
- } else if (metFloats) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AFLOAT);
- aFloat.setValue(floatVal);
- serde.serialize(aFloat, out);
- } else if (metInt64s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- aInt64.setValue(longVal);
- serde.serialize(aInt64, out);
- } else if (metInt32s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT32);
- aInt32.setValue(intVal);
- serde.serialize(aInt32, out);
- } else if (metInt16s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT16);
- aInt16.setValue(shortVal);
- serde.serialize(aInt16, out);
- } else if (metInt8s) {
- throw new NotImplementedException("no implementation for int8's comparator");
- } else {
- GlobalConfig.ASTERIX_LOGGER.fine("SUM aggregate ran over empty input.");
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
-
- }
-
- @Override
- public void finishPartial() throws AlgebricksException {
- finish();
- }
- };
+ return new MaxAggregateFunction(args, provider, false);
}
};
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateFunction.java
new file mode 100644
index 0000000..1cb60b2
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateFunction.java
@@ -0,0 +1,197 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class MaxAggregateFunction implements ICopyAggregateFunction {
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private DataOutput out;
+ private ICopyEvaluator eval;
+ private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
+
+ private short shortVal = Short.MIN_VALUE;
+ private int intVal = Integer.MIN_VALUE;
+ private long longVal = Long.MIN_VALUE;
+ private float floatVal = Float.MIN_VALUE;
+ private double doubleVal = Double.MIN_VALUE;
+
+ private AMutableDouble aDouble = new AMutableDouble(0);
+ private AMutableFloat aFloat = new AMutableFloat(0);
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private AMutableInt32 aInt32 = new AMutableInt32(0);
+ private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+ @SuppressWarnings("rawtypes")
+ private ISerializerDeserializer serde;
+ private final boolean isLocalAgg;
+
+ public MaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
+ throws AlgebricksException {
+ out = provider.getDataOutput();
+ eval = args[0].createEvaluator(inputVal);
+ this.isLocalAgg = isLocalAgg;
+ }
+
+ @Override
+ public void init() {
+ shortVal = Short.MIN_VALUE;
+ intVal = Integer.MIN_VALUE;
+ longVal = Long.MIN_VALUE;
+ floatVal = Float.MIN_VALUE;
+ doubleVal = Double.MIN_VALUE;
+
+ metInt8s = false;
+ metInt16s = false;
+ metInt32s = false;
+ metInt64s = false;
+ metFloats = false;
+ metDoubles = false;
+ metNull = false;
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ inputVal.reset();
+ eval.evaluate(tuple);
+ if (inputVal.getLength() > 0) {
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
+ .getByteArray()[0]);
+ switch (typeTag) {
+ case INT8: {
+ metInt8s = true;
+ throw new NotImplementedException("no implementation for int8's comparator");
+ }
+ case INT16: {
+ metInt16s = true;
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ if (val > shortVal)
+ shortVal = val;
+ throw new NotImplementedException("no implementation for int16's comparator");
+ }
+ case INT32: {
+ metInt32s = true;
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ if (val > intVal)
+ intVal = val;
+ break;
+ }
+ case INT64: {
+ metInt64s = true;
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ if (val > longVal)
+ longVal = val;
+ break;
+ }
+ case FLOAT: {
+ metFloats = true;
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ if (val > floatVal)
+ floatVal = val;
+ break;
+ }
+ case DOUBLE: {
+ metDoubles = true;
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ if (val > doubleVal)
+ doubleVal = val;
+ break;
+ }
+ case NULL: {
+ metNull = true;
+ break;
+ }
+ case SYSTEM_NULL: {
+ // For global aggregates simply ignore system null here,
+ // but if all input value are system null, then we should return
+ // null in finish().
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute SUM for values of type "
+ + typeTag);
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void finish() throws AlgebricksException {
+ try {
+ if (metNull) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ } else if (metDoubles) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ADOUBLE);
+ aDouble.setValue(doubleVal);
+ serde.serialize(aDouble, out);
+ } else if (metFloats) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AFLOAT);
+ aFloat.setValue(floatVal);
+ serde.serialize(aFloat, out);
+ } else if (metInt64s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+ aInt64.setValue(longVal);
+ serde.serialize(aInt64, out);
+ } else if (metInt32s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT32);
+ aInt32.setValue(intVal);
+ serde.serialize(aInt32, out);
+ } else if (metInt16s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT16);
+ aInt16.setValue(shortVal);
+ serde.serialize(aInt16, out);
+ } else if (metInt8s) {
+ throw new NotImplementedException("no implementation for int8's comparator");
+ } else {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ }
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
index af54c14..0d2b68b 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
@@ -1,39 +1,15 @@
package edu.uci.ics.asterix.runtime.aggregates.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class MinAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
@@ -50,7 +26,6 @@
return FID;
}
- @SuppressWarnings("unchecked")
@Override
public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
throws AlgebricksException {
@@ -60,154 +35,8 @@
@Override
public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
throws AlgebricksException {
-
- return new ICopyAggregateFunction() {
-
- private DataOutput out = provider.getDataOutput();
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
- private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
-
- private short shortVal = Short.MAX_VALUE;
- private int intVal = Integer.MAX_VALUE;
- private long longVal = Long.MAX_VALUE;
- private float floatVal = Float.MAX_VALUE;
- private double doubleVal = Double.MAX_VALUE;
-
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableFloat aFloat = new AMutableFloat(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
- private AMutableInt32 aInt32 = new AMutableInt32(0);
- private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
- @SuppressWarnings("rawtypes")
- private ISerializerDeserializer serde;
-
- @Override
- public void init() {
- shortVal = Short.MAX_VALUE;
- intVal = Integer.MAX_VALUE;
- longVal = Long.MAX_VALUE;
- floatVal = Float.MAX_VALUE;
- doubleVal = Double.MAX_VALUE;
-
- metInt8s = false;
- metInt16s = false;
- metInt32s = false;
- metInt64s = false;
- metFloats = false;
- metDoubles = false;
- metNull = false;
- }
-
- @Override
- public void step(IFrameTupleReference tuple) throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- if (inputVal.getLength() > 0) {
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
- .getByteArray()[0]);
- switch (typeTag) {
- case INT8: {
- metInt8s = true;
- throw new NotImplementedException("no implementation for int8's comparator");
- }
- case INT16: {
- metInt16s = true;
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- if (val < shortVal)
- shortVal = val;
- throw new NotImplementedException("no implementation for int16's comparator");
- }
- case INT32: {
- metInt32s = true;
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- if (val < intVal)
- intVal = val;
- break;
- }
- case INT64: {
- metInt64s = true;
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- if (val < longVal)
- longVal = val;
- break;
- }
- case FLOAT: {
- metFloats = true;
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- if (val < floatVal)
- floatVal = val;
- break;
- }
- case DOUBLE: {
- metDoubles = true;
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- if (val < doubleVal)
- doubleVal = val;
- break;
- }
- case NULL: {
- metNull = true;
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute SUM for values of type "
- + typeTag);
- }
- }
- }
- }
-
- @Override
- public void finish() throws AlgebricksException {
- try {
- if (metNull) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- } else if (metDoubles) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- aDouble.setValue(doubleVal);
- serde.serialize(aDouble, out);
- } else if (metFloats) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AFLOAT);
- aFloat.setValue(floatVal);
- serde.serialize(aFloat, out);
- } else if (metInt64s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- aInt64.setValue(longVal);
- serde.serialize(aInt64, out);
- } else if (metInt32s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT32);
- aInt32.setValue(intVal);
- serde.serialize(aInt32, out);
- } else if (metInt16s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT16);
- aInt16.setValue(shortVal);
- serde.serialize(aInt16, out);
- } else if (metInt8s) {
- throw new NotImplementedException("no implementation for int8's comparator");
- } else {
- GlobalConfig.ASTERIX_LOGGER.fine("SUM aggregate ran over empty input.");
- }
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
-
- }
-
- @Override
- public void finishPartial() throws AlgebricksException {
- finish();
- }
- };
+ return new MinAggregateFunction(args, provider, false);
}
};
}
-
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateFunction.java
new file mode 100644
index 0000000..0fd2355
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateFunction.java
@@ -0,0 +1,205 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class MinAggregateFunction implements ICopyAggregateFunction {
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private DataOutput out;
+ private ICopyEvaluator eval;
+ private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats,
+ metDoubles, metNull;
+ private short shortVal = Short.MAX_VALUE;
+ private int intVal = Integer.MAX_VALUE;
+ private long longVal = Long.MAX_VALUE;
+ private float floatVal = Float.MAX_VALUE;
+ private double doubleVal = Double.MAX_VALUE;
+
+ private AMutableDouble aDouble = new AMutableDouble(0);
+ private AMutableFloat aFloat = new AMutableFloat(0);
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private AMutableInt32 aInt32 = new AMutableInt32(0);
+ private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+ @SuppressWarnings("rawtypes")
+ private ISerializerDeserializer serde;
+ private final boolean isLocalAgg;
+
+ public MinAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
+ throws AlgebricksException {
+ out = provider.getDataOutput();
+ eval = args[0].createEvaluator(inputVal);
+ this.isLocalAgg = isLocalAgg;
+ }
+
+ @Override
+ public void init() {
+ shortVal = Short.MAX_VALUE;
+ intVal = Integer.MAX_VALUE;
+ longVal = Long.MAX_VALUE;
+ floatVal = Float.MAX_VALUE;
+ doubleVal = Double.MAX_VALUE;
+
+ metInt8s = false;
+ metInt16s = false;
+ metInt32s = false;
+ metInt64s = false;
+ metFloats = false;
+ metDoubles = false;
+ metNull = false;
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ inputVal.reset();
+ eval.evaluate(tuple);
+ if (inputVal.getLength() > 0) {
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(inputVal.getByteArray()[0]);
+ switch (typeTag) {
+ case INT8: {
+ metInt8s = true;
+ throw new NotImplementedException(
+ "no implementation for int8's comparator");
+ }
+ case INT16: {
+ metInt16s = true;
+ short val = AInt16SerializerDeserializer.getShort(
+ inputVal.getByteArray(), 1);
+ if (val < shortVal)
+ shortVal = val;
+ throw new NotImplementedException(
+ "no implementation for int16's comparator");
+ }
+ case INT32: {
+ metInt32s = true;
+ int val = AInt32SerializerDeserializer.getInt(
+ inputVal.getByteArray(), 1);
+ if (val < intVal)
+ intVal = val;
+ break;
+ }
+ case INT64: {
+ metInt64s = true;
+ long val = AInt64SerializerDeserializer.getLong(
+ inputVal.getByteArray(), 1);
+ if (val < longVal)
+ longVal = val;
+ break;
+ }
+ case FLOAT: {
+ metFloats = true;
+ float val = AFloatSerializerDeserializer.getFloat(
+ inputVal.getByteArray(), 1);
+ if (val < floatVal)
+ floatVal = val;
+ break;
+ }
+ case DOUBLE: {
+ metDoubles = true;
+ double val = ADoubleSerializerDeserializer.getDouble(
+ inputVal.getByteArray(), 1);
+ if (val < doubleVal)
+ doubleVal = val;
+ break;
+ }
+ case NULL: {
+ metNull = true;
+ break;
+ }
+ case SYSTEM_NULL: {
+ // For global aggregates simply ignore system null here,
+ // but if all input value are system null, then we should return
+ // null in finish().
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ break;
+ }
+ default: {
+ throw new NotImplementedException(
+ "Cannot compute SUM for values of type " + typeTag);
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void finish() throws AlgebricksException {
+ try {
+ if (metNull) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ } else if (metDoubles) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ADOUBLE);
+ aDouble.setValue(doubleVal);
+ serde.serialize(aDouble, out);
+ } else if (metFloats) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AFLOAT);
+ aFloat.setValue(floatVal);
+ serde.serialize(aFloat, out);
+ } else if (metInt64s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+ aInt64.setValue(longVal);
+ serde.serialize(aInt64, out);
+ } else if (metInt32s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT32);
+ aInt32.setValue(intVal);
+ serde.serialize(aInt32, out);
+ } else if (metInt16s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT16);
+ aInt16.setValue(shortVal);
+ serde.serialize(aInt16, out);
+ } else if (metInt8s) {
+ throw new NotImplementedException(
+ "no implementation for int8's comparator");
+ } else {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ }
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateDescriptor.java
index 5e45432..ad2880a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateDescriptor.java
@@ -1,41 +1,15 @@
package edu.uci.ics.asterix.runtime.aggregates.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
-import edu.uci.ics.asterix.om.base.ANull;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
@@ -52,7 +26,6 @@
return FID;
}
- @SuppressWarnings("unchecked")
@Override
public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
throws AlgebricksException {
@@ -62,145 +35,8 @@
@Override
public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
throws AlgebricksException {
-
- return new ICopyAggregateFunction() {
-
- private DataOutput out = provider.getDataOutput();
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
- private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
- private double sum;
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableFloat aFloat = new AMutableFloat(0);
- private AMutableInt64 aInt64 = new AMutableInt64(0);
- private AMutableInt32 aInt32 = new AMutableInt32(0);
- private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
- private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
- @SuppressWarnings("rawtypes")
- private ISerializerDeserializer serde;
-
- @Override
- public void init() {
- metInt8s = false;
- metInt16s = false;
- metInt32s = false;
- metInt64s = false;
- metFloats = false;
- metDoubles = false;
- metNull = false;
- sum = 0.0;
- }
-
- @Override
- public void step(IFrameTupleReference tuple) throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- if (inputVal.getLength() > 0) {
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
- .getByteArray()[0]);
- switch (typeTag) {
- case INT8: {
- metInt8s = true;
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT16: {
- metInt16s = true;
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT32: {
- metInt32s = true;
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case INT64: {
- metInt64s = true;
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case FLOAT: {
- metFloats = true;
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case DOUBLE: {
- metDoubles = true;
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
- sum += val;
- break;
- }
- case NULL: {
- metNull = true;
- break;
- }
- default: {
- throw new NotImplementedException("Cannot compute SUM for values of type "
- + typeTag);
- }
- }
- }
- }
-
- @Override
- public void finish() throws AlgebricksException {
- try {
- if (metNull) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- } else if (metDoubles) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
- aDouble.setValue(sum);
- serde.serialize(aDouble, out);
- } else if (metFloats) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AFLOAT);
- aFloat.setValue((float) sum);
- serde.serialize(aFloat, out);
- } else if (metInt64s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT64);
- aInt64.setValue((long) sum);
- serde.serialize(aInt64, out);
- } else if (metInt32s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT32);
- aInt32.setValue((int) sum);
- serde.serialize(aInt32, out);
- } else if (metInt16s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT16);
- aInt16.setValue((short) sum);
- serde.serialize(aInt16, out);
- } else if (metInt8s) {
- serde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AINT8);
- aInt8.setValue((byte) sum);
- serde.serialize(aInt8, out);
- } else {
- GlobalConfig.ASTERIX_LOGGER.fine("SUM aggregate ran over empty input.");
- }
-
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
-
- }
-
- @Override
- public void finishPartial() throws AlgebricksException {
- finish();
- }
- };
- }
+ return new SumAggregateFunction(args, provider, false);
+ };
};
}
-
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
new file mode 100644
index 0000000..de33431
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
@@ -0,0 +1,181 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SumAggregateFunction implements ICopyAggregateFunction {
+ private DataOutput out;
+ private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+ private ICopyEvaluator eval;
+ private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
+ private double sum;
+ private AMutableDouble aDouble = new AMutableDouble(0);
+ private AMutableFloat aFloat = new AMutableFloat(0);
+ private AMutableInt64 aInt64 = new AMutableInt64(0);
+ private AMutableInt32 aInt32 = new AMutableInt32(0);
+ private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+ private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+ @SuppressWarnings("rawtypes")
+ private ISerializerDeserializer serde;
+
+ private final boolean isLocalAgg;
+
+ public SumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
+ throws AlgebricksException {
+ out = provider.getDataOutput();
+ eval = args[0].createEvaluator(inputVal);
+ this.isLocalAgg = isLocalAgg;
+ }
+
+ @Override
+ public void init() {
+ metInt8s = false;
+ metInt16s = false;
+ metInt32s = false;
+ metInt64s = false;
+ metFloats = false;
+ metDoubles = false;
+ metNull = false;
+ sum = 0.0;
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ inputVal.reset();
+ eval.evaluate(tuple);
+ if (inputVal.getLength() > 0) {
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ switch (typeTag) {
+ case INT8: {
+ metInt8s = true;
+ byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT16: {
+ metInt16s = true;
+ short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT32: {
+ metInt32s = true;
+ int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case INT64: {
+ metInt64s = true;
+ long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case FLOAT: {
+ metFloats = true;
+ float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case DOUBLE: {
+ metDoubles = true;
+ double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ sum += val;
+ break;
+ }
+ case NULL: {
+ metNull = true;
+ break;
+ }
+ case SYSTEM_NULL: {
+ // For global aggregates simply ignore system null here,
+ // but if all input value are system null, then we should return
+ // null in finish().
+ if (isLocalAgg) {
+ throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+ }
+ break;
+ }
+ default: {
+ throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag);
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void finish() throws AlgebricksException {
+ try {
+ if (metNull) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ } else if (metDoubles) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+ aDouble.setValue(sum);
+ serde.serialize(aDouble, out);
+ } else if (metFloats) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
+ aFloat.setValue((float) sum);
+ serde.serialize(aFloat, out);
+ } else if (metInt64s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+ aInt64.setValue((long) sum);
+ serde.serialize(aInt64, out);
+ } else if (metInt32s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+ aInt32.setValue((int) sum);
+ serde.serialize(aInt32, out);
+ } else if (metInt16s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
+ aInt16.setValue((short) sum);
+ serde.serialize(aInt16, out);
+ } else if (metInt8s) {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
+ aInt8.setValue((byte) sum);
+ serde.serialize(aInt8, out);
+ } else {
+ // Empty stream. For local agg return system null. For global agg return null.
+ if (isLocalAgg) {
+ out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+ } else {
+ serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ serde.serialize(ANull.NULL, out);
+ }
+ }
+ } catch (IOException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
+ }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/stream/EmptyStreamAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/stream/EmptyStreamAggregateDescriptor.java
new file mode 100644
index 0000000..d1d90c0
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/stream/EmptyStreamAggregateDescriptor.java
@@ -0,0 +1,89 @@
+package edu.uci.ics.asterix.runtime.aggregates.stream;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ABoolean;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class EmptyStreamAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "empty-stream", 0);
+ public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new EmptyStreamAggregateDescriptor();
+ }
+ };
+
+ @Override
+ public ICopyAggregateFunctionFactory createAggregateFunctionFactory(ICopyEvaluatorFactory[] args)
+ throws AlgebricksException {
+ return new ICopyAggregateFunctionFactory() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ throws AlgebricksException {
+
+ return new ICopyAggregateFunction() {
+
+ private DataOutput out = provider.getDataOutput();
+ @SuppressWarnings("rawtypes")
+ private ISerializerDeserializer serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+
+ boolean res = true;
+
+ @Override
+ public void init() throws AlgebricksException {
+ res = true;
+ }
+
+ @Override
+ public void step(IFrameTupleReference tuple) throws AlgebricksException {
+ res = false;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void finish() throws AlgebricksException {
+ ABoolean b = res ? ABoolean.TRUE : ABoolean.FALSE;
+ try {
+ serde.serialize(b, out);
+ } catch (HyracksDataException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finishPartial() throws AlgebricksException {
+ finish();
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return FID;
+ }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index e9f1ac5..ff50d4a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -45,14 +45,19 @@
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableCountAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableGlobalAvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.AvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.CountAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.GlobalAvgAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.LocalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalMaxAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSumAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.MaxAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.MinAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.std.SumAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.stream.EmptyStreamAggregateDescriptor;
import edu.uci.ics.asterix.runtime.aggregates.stream.NonEmptyStreamAggregateDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.common.CreateMBREvalFactory;
import edu.uci.ics.asterix.runtime.evaluators.common.FieldAccessByIndexEvalFactory;
@@ -80,6 +85,7 @@
import edu.uci.ics.asterix.runtime.evaluators.functions.AnyCollectionMemberDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.CastRecordDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.ClosedRecordConstructorDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.CodePointToStringDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.ContainsDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.CountHashedGramTokensDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.CountHashedWordTokensDescriptor;
@@ -107,10 +113,16 @@
import edu.uci.ics.asterix.runtime.evaluators.functions.LenDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.LikeDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NotDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericAbsDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NumericAddDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericCeilingDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NumericDivideDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericFloorDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NumericModuloDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NumericMultiplyDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundHalfToEven2Descriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundHalfToEvenDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NumericSubtractDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NumericUnaryMinusDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.OpenRecordConstructorDescriptor;
@@ -129,33 +141,26 @@
import edu.uci.ics.asterix.runtime.evaluators.functions.SpatialDistanceDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.SpatialIntersectDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.StartsWithDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringConcatDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringEndWithDescrtiptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringEqualDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringJoinDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringLengthDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringLowerCaseDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringMatchesDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringMatchesWithFlagDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringReplaceDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringReplaceWithFlagsDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringStartWithDescrtiptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringToCodePointDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.Substring2Descriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringAfterDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringBeforeDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.SwitchCaseDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.UnorderedListConstructorDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.WordTokensDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.YearDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericAbsDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericCeilingDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericFloorDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundHalfToEvenDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundHalfToEven2Descriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringEqualDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringStartWithDescrtiptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringEndWithDescrtiptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringMatchesDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringLowerCaseDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringMatchesWithFlagDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringReplaceDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringReplaceWithFlagsDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringLengthDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.Substring2Descriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringBeforeDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringAfterDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringToCodePointDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.CodePointToStringDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringConcatDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringJoinDescriptor;
import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
import edu.uci.ics.asterix.runtime.runningaggregates.std.TidRunningAggregateDescriptor;
@@ -265,6 +270,7 @@
temp.add(IsNullDescriptor.FACTORY);
temp.add(NotDescriptor.FACTORY);
temp.add(LenDescriptor.FACTORY);
+ temp.add(EmptyStreamAggregateDescriptor.FACTORY);
temp.add(NonEmptyStreamAggregateDescriptor.FACTORY);
temp.add(RangeDescriptor.FACTORY);
@@ -300,8 +306,11 @@
temp.add(LocalAvgAggregateDescriptor.FACTORY);
temp.add(GlobalAvgAggregateDescriptor.FACTORY);
temp.add(SumAggregateDescriptor.FACTORY);
+ temp.add(LocalSumAggregateDescriptor.FACTORY);
temp.add(MaxAggregateDescriptor.FACTORY);
+ temp.add(LocalMaxAggregateDescriptor.FACTORY);
temp.add(MinAggregateDescriptor.FACTORY);
+ temp.add(LocalMinAggregateDescriptor.FACTORY);
// serializable aggregates
temp.add(SerializableCountAggregateDescriptor.FACTORY);
@@ -309,6 +318,7 @@
temp.add(SerializableLocalAvgAggregateDescriptor.FACTORY);
temp.add(SerializableGlobalAvgAggregateDescriptor.FACTORY);
temp.add(SerializableSumAggregateDescriptor.FACTORY);
+ temp.add(SerializableLocalSumAggregateDescriptor.FACTORY);
// new functions - constructors
temp.add(ABooleanConstructorDescriptor.FACTORY);