Merged asterix_stabilization r492:r527.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_fix_agg@528 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index f065c3e..a01af41 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -35,6 +35,7 @@
 import edu.uci.ics.asterix.optimizer.rules.LoadRecordFieldsRule;
 import edu.uci.ics.asterix.optimizer.rules.NestGroupByRule;
 import edu.uci.ics.asterix.optimizer.rules.PullPositionalVariableFromUnnestRule;
+import edu.uci.ics.asterix.optimizer.rules.PushAggFuncIntoStandaloneAggregateRule;
 import edu.uci.ics.asterix.optimizer.rules.PushAggregateIntoGroupbyRule;
 import edu.uci.ics.asterix.optimizer.rules.PushFieldAccessRule;
 import edu.uci.ics.asterix.optimizer.rules.PushGroupByThroughProduct;
@@ -62,8 +63,8 @@
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.InsertProjectBeforeUnionRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroHashPartitionMergeExchange;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroJoinInsideSubplanRule;
-import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceCombinerRule;
-import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByForStandaloneAggregRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceAggregateCombinerRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByCombinerRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByForSubplanRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IsolateHyracksOperatorsRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PullSelectOutOfEqJoin;
@@ -93,7 +94,7 @@
     public final static List<IAlgebraicRewriteRule> buildNormalizationRuleCollection() {
         List<IAlgebraicRewriteRule> normalization = new LinkedList<IAlgebraicRewriteRule>();
         normalization.add(new EliminateSubplanRule());
-        normalization.add(new IntroduceGroupByForStandaloneAggregRule());
+        normalization.add(new PushAggFuncIntoStandaloneAggregateRule());
         normalization.add(new BreakSelectIntoConjunctsRule());
         normalization.add(new ExtractGbyExpressionsRule());
         normalization.add(new ExtractDistinctByExpressionsRule());
@@ -166,7 +167,8 @@
         consolidation.add(new ConsolidateSelectsRule());
         consolidation.add(new ConsolidateAssignsRule());
         consolidation.add(new InlineAssignIntoAggregateRule());
-        consolidation.add(new IntroduceCombinerRule());
+        consolidation.add(new IntroduceGroupByCombinerRule());
+        consolidation.add(new IntroduceAggregateCombinerRule());
         consolidation.add(new CountVarToCountOneRule());
         consolidation.add(new IntroduceSelectAccessMethodRule());
         consolidation.add(new IntroduceJoinAccessMethodRule());
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java
new file mode 100644
index 0000000..c5a1cb0
--- /dev/null
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Pushes aggregate functions into a stand alone aggregate operator (no group by). 
+ */
+public class PushAggFuncIntoStandaloneAggregateRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        // Pattern to match: assign <-- aggregate <-- !(group-by)
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+            return false;
+        }
+        Mutable<ILogicalOperator> opRef2 = op.getInputs().get(0);
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
+        if (op2.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+            return false;
+        }
+        // If there's a group by below the agg, then we want to have the agg pushed into the group by.
+        Mutable<ILogicalOperator> opRef3 = op2.getInputs().get(0);
+        AbstractLogicalOperator op3 = (AbstractLogicalOperator) opRef3.getValue();
+        if (op3.getOperatorTag() == LogicalOperatorTag.GROUP) {
+            return false;
+        }
+        
+        AssignOperator assignOp = (AssignOperator) op;
+        AggregateOperator aggOp = (AggregateOperator) op2;
+        if (aggOp.getVariables().size() != 1) {
+            return false;
+        }
+
+        // Make sure the agg expr is a listify.
+        ILogicalExpression aggExpr = aggOp.getExpressions().get(0).getValue();
+        if (aggExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            return false;
+        }
+        AbstractFunctionCallExpression origAggFuncExpr = (AbstractFunctionCallExpression) aggExpr;
+        if (origAggFuncExpr.getFunctionIdentifier() != AsterixBuiltinFunctions.LISTIFY) {
+            return false;
+        }
+        
+        LogicalVariable aggVar = aggOp.getVariables().get(0);
+        List<LogicalVariable> used = new LinkedList<LogicalVariable>();
+        VariableUtilities.getUsedVariables(assignOp, used);
+        if (!used.contains(aggVar)) {
+            return false;
+        }
+        
+        Mutable<ILogicalExpression> srcAssignExprRef = fingAggFuncExprRef(assignOp.getExpressions(), aggVar);
+        if (srcAssignExprRef == null) {
+        	return false;
+        }
+        AbstractFunctionCallExpression assignFuncExpr = (AbstractFunctionCallExpression) srcAssignExprRef.getValue();
+        FunctionIdentifier aggFuncIdent = AsterixBuiltinFunctions.getAggregateFunction(assignFuncExpr.getFunctionIdentifier());
+        
+        // Push the agg func into the agg op.                
+        AbstractFunctionCallExpression aggOpExpr = (AbstractFunctionCallExpression) aggOp.getExpressions().get(0).getValue();
+        List<Mutable<ILogicalExpression>> aggArgs = new ArrayList<Mutable<ILogicalExpression>>();
+        aggArgs.add(aggOpExpr.getArguments().get(0));
+        AggregateFunctionCallExpression aggFuncExpr = AsterixBuiltinFunctions.makeAggregateFunctionExpression(aggFuncIdent, aggArgs);
+        aggOp.getExpressions().get(0).setValue(aggFuncExpr);
+        
+        // The assign now just "renames" the variable to make sure the upstream plan still works.
+        srcAssignExprRef.setValue(new VariableReferenceExpression(aggVar));
+
+        // Create a new assign for a TRUE variable.
+        LogicalVariable trueVar = context.newVar();
+        AssignOperator trueAssignOp = new AssignOperator(trueVar, new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
+        
+        ILogicalOperator aggInput = aggOp.getInputs().get(0).getValue();
+        aggOp.getInputs().get(0).setValue(trueAssignOp);
+        trueAssignOp.getInputs().add(new MutableObject<ILogicalOperator>(aggInput));
+        
+        // Set partitioning variable.
+        aggOp.setPartitioningVariable(trueVar);
+        
+        context.computeAndSetTypeEnvironmentForOperator(trueAssignOp);
+        context.computeAndSetTypeEnvironmentForOperator(aggOp);
+        context.computeAndSetTypeEnvironmentForOperator(assignOp);
+        
+        return true;
+    }
+    
+    private Mutable<ILogicalExpression> fingAggFuncExprRef(List<Mutable<ILogicalExpression>> exprRefs, LogicalVariable aggVar) {
+    	for (Mutable<ILogicalExpression> exprRef : exprRefs) {
+            ILogicalExpression expr = exprRef.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+                continue;
+            }
+            AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+            FunctionIdentifier funcIdent = AsterixBuiltinFunctions.getAggregateFunction(funcExpr.getFunctionIdentifier());
+            if (funcIdent == null) {
+            	// Recursively look in func args.
+            	return fingAggFuncExprRef(funcExpr.getArguments(), aggVar);
+            }
+            // Check if this is the expr that uses aggVar.
+            Collection<LogicalVariable> usedVars = new HashSet<LogicalVariable>();
+            funcExpr.getUsedVariables(usedVars);
+            if (usedVars.contains(aggVar)) {
+            	return exprRef;
+            }
+    	}
+    	return null;
+    }
+}
diff --git a/asterix-app/src/test/resources/runtimets/ignore.txt b/asterix-app/src/test/resources/runtimets/ignore.txt
index 1cf8519..82051f9 100644
--- a/asterix-app/src/test/resources/runtimets/ignore.txt
+++ b/asterix-app/src/test/resources/runtimets/ignore.txt
@@ -37,3 +37,8 @@
 quantifiers/somesat_05.aql
 quantifiers/everysat_02.aql
 quantifiers/everysat_03.aql
+aggregate/avg_empty_02.aql
+aggregate/min_empty_01.aql
+aggregate/min_empty_02.aql
+aggregate/max_empty_01.aql
+aggregate/max_empty_02.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_01.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_01.aql
new file mode 100644
index 0000000..ab2a6fb
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_01.aql
@@ -0,0 +1,17 @@
+/*
+ * Description    : Tests that avg aggregation correctly returns null for an empty stream,
+ *                  without an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_avg_empty_01.adm";
+
+avg(
+ for $x in [1, 2, 3]
+ where $x > 10
+ return $x
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_02.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_02.aql
new file mode 100644
index 0000000..3583ce0
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_02.aql
@@ -0,0 +1,23 @@
+/*
+ * Description    : Tests that avg aggregation correctly returns null for an empty stream,
+ *                  with an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TestType as closed {
+  id: int32,
+  val: double
+}
+
+create dataset Test(TestType) partitioned by key id;
+
+write output to nc1:"rttest/aggregate_avg_empty_02.adm";
+
+avg(
+ for $x in dataset('Test')
+ return $x.val
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_01.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_01.aql
new file mode 100644
index 0000000..f03c252
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_01.aql
@@ -0,0 +1,17 @@
+/*
+ * Description    : Tests that count aggregation correctly returns 0 for an empty stream,
+ *                  without an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_count_empty_01.adm";
+
+count(
+ for $x in [1, 2, 3]
+ where $x > 10
+ return $x
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_02.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_02.aql
new file mode 100644
index 0000000..0a6cc8e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_02.aql
@@ -0,0 +1,23 @@
+/*
+ * Description    : Tests that count aggregation correctly returns 0 for an empty stream,
+ *                  with an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TestType as closed {
+  id: int32,
+  val: double
+}
+
+create dataset Test(TestType) partitioned by key id;
+
+write output to nc1:"rttest/aggregate_count_empty_02.adm";
+
+count(
+ for $x in dataset('Test')
+ return $x.val
+)
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_01.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_01.aql
new file mode 100644
index 0000000..2464b64
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_01.aql
@@ -0,0 +1,17 @@
+/*
+ * Description    : Tests that max aggregation correctly returns null for an empty stream,
+ *                  without an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_max_empty_01.adm";
+
+max(
+ for $x in [1, 2, 3]
+ where $x > 10
+ return $x
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_02.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_02.aql
new file mode 100644
index 0000000..79ae1d8
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_02.aql
@@ -0,0 +1,23 @@
+/*
+ * Description    : Tests that max aggregation correctly returns null for an empty stream,
+ *                  with an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TestType as closed {
+  id: int32,
+  val: double
+}
+
+create dataset Test(TestType) partitioned by key id;
+
+write output to nc1:"rttest/aggregate_max_empty_02.adm";
+
+max(
+ for $x in dataset('Test')
+ return $x.val
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_01.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_01.aql
new file mode 100644
index 0000000..30abd1e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_01.aql
@@ -0,0 +1,17 @@
+/*
+ * Description    : Tests that min aggregation correctly returns null for an empty stream,
+ *                  without an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_min_empty_01.adm";
+
+min(
+ for $x in [1, 2, 3]
+ where $x > 10
+ return $x
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_02.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_02.aql
new file mode 100644
index 0000000..99d49f4
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_02.aql
@@ -0,0 +1,23 @@
+/*
+ * Description    : Tests that min aggregation correctly returns null for an empty stream,
+ *                  with an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TestType as closed {
+  id: int32,
+  val: double
+}
+
+create dataset Test(TestType) partitioned by key id;
+
+write output to nc1:"rttest/aggregate_min_empty_02.adm";
+
+min(
+ for $x in dataset('Test')
+ return $x.val
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_01.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_01.aql
new file mode 100644
index 0000000..b4e26b8
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_01.aql
@@ -0,0 +1,17 @@
+/*
+ * Description    : Tests that sum aggregation correctly returns null for an empty stream,
+ *                  without an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_sum_empty_01.adm";
+
+sum(
+ for $x in [1, 2, 3]
+ where $x > 10
+ return $x
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_02.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_02.aql
new file mode 100644
index 0000000..a94457a
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_02.aql
@@ -0,0 +1,23 @@
+/*
+ * Description    : Tests that sum aggregation correctly returns null for an empty stream,
+ *                  with an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TestType as closed {
+  id: int32,
+  val: double
+}
+
+create dataset Test(TestType) partitioned by key id;
+
+write output to nc1:"rttest/aggregate_sum_empty_02.adm";
+
+sum(
+ for $x in dataset('Test')
+ return $x.val
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/quantifiers/somesat_06.aql b/asterix-app/src/test/resources/runtimets/queries/quantifiers/somesat_06.aql
new file mode 100644
index 0000000..6e8892c
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/quantifiers/somesat_06.aql
@@ -0,0 +1,23 @@
+/*
+ * Description    : Tests that existential quantification returns true/false correctly.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/quantifiers_somesat_06.adm";
+
+let $x := [
+some $x in [false,false] satisfies $x,
+some $x in [true,false] satisfies $x,
+some $x in [false,true] satisfies $x,
+some $x in [true,true] satisfies $x,
+some $x in [false,false] satisfies not($x),
+some $x in [true,false] satisfies not($x),
+some $x in [false,true] satisfies not($x),
+some $x in [true,true] satisfies not($x)
+]
+for $i in $x
+return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_01.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_01.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_01.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_02.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_02.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_02.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_01.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_01.adm
new file mode 100644
index 0000000..573541a
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_01.adm
@@ -0,0 +1 @@
+0
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_02.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_02.adm
new file mode 100644
index 0000000..573541a
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_02.adm
@@ -0,0 +1 @@
+0
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_01.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_01.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_01.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_02.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_02.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_02.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_01.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_01.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_01.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_02.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_02.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_02.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_01.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_01.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_01.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_02.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_02.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_02.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/quantifiers/somesat_06.adm b/asterix-app/src/test/resources/runtimets/results/quantifiers/somesat_06.adm
new file mode 100644
index 0000000..2cc1766
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/quantifiers/somesat_06.adm
@@ -0,0 +1,8 @@
+false
+true
+true
+true
+true
+true
+true
+false
\ No newline at end of file
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index 417948f..8669ef0 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -234,6 +234,7 @@
     public final static FunctionIdentifier AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-avg", 1);
     public final static FunctionIdentifier COUNT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-count", 1);
     public final static FunctionIdentifier SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-sum", 1);
+    public final static FunctionIdentifier LOCAL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sum", 1);
     public final static FunctionIdentifier MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-max", 1);
     public final static FunctionIdentifier MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-min", 1);
     public final static FunctionIdentifier GLOBAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -259,6 +260,8 @@
             "count-serial", 1);
     public final static FunctionIdentifier SERIAL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "sum-serial", 1);
+    public final static FunctionIdentifier SERIAL_LOCAL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "local-sum-serial", 1);
     public final static FunctionIdentifier SERIAL_GLOBAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "global-avg-serial", 1);
     public final static FunctionIdentifier SERIAL_LOCAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -571,6 +574,7 @@
         add(SERIAL_GLOBAL_AVG, OptionalADoubleTypeComputer.INSTANCE);
         add(SERIAL_LOCAL_AVG, NonTaggedLocalAvgTypeComputer.INSTANCE);
         add(SERIAL_SUM, NonTaggedSumTypeComputer.INSTANCE);
+        add(SERIAL_LOCAL_SUM, NonTaggedSumTypeComputer.INSTANCE);
         add(SIMILARITY_JACCARD, AFloatTypeComputer.INSTANCE);
         add(SIMILARITY_JACCARD_CHECK, OrderedListOfAnyTypeComputer.INSTANCE);
         add(SIMILARITY_JACCARD_SORTED, AFloatTypeComputer.INSTANCE);
@@ -622,6 +626,7 @@
         });
         add(SUBSTRING, SubstringTypeComputer.INSTANCE);
         add(SUM, NonTaggedSumTypeComputer.INSTANCE);
+        add(LOCAL_SUM, NonTaggedSumTypeComputer.INSTANCE);
         add(SWITCH_CASE, NonTaggedSwitchCaseComputer.INSTANCE);
         add(REG_EXP, ABooleanTypeComputer.INSTANCE);
         add(INJECT_FAILURE, InjectFailureTypeComputer.INSTANCE);
@@ -679,7 +684,7 @@
         addGlobalAgg(MIN, MIN);
 
         addAgg(SUM);
-        addLocalAgg(SUM, SUM);
+        addLocalAgg(SUM, LOCAL_SUM);
         addGlobalAgg(SUM, SUM);
 
         addAgg(LISTIFY);
@@ -688,6 +693,7 @@
         addSerialAgg(AVG, SERIAL_AVG);
         addSerialAgg(COUNT, SERIAL_COUNT);
         addSerialAgg(SUM, SERIAL_SUM);
+        addSerialAgg(LOCAL_SUM, SERIAL_LOCAL_SUM);
         addSerialAgg(LOCAL_AVG, SERIAL_LOCAL_AVG);
         addSerialAgg(GLOBAL_AVG, SERIAL_GLOBAL_AVG);
 
@@ -702,7 +708,8 @@
         addGlobalAgg(SERIAL_AVG, SERIAL_GLOBAL_AVG);
 
         addAgg(SERIAL_SUM);
-        addLocalAgg(SERIAL_SUM, SERIAL_SUM);
+        addAgg(SERIAL_LOCAL_SUM);
+        addLocalAgg(SERIAL_SUM, SERIAL_LOCAL_SUM);
         addGlobalAgg(SERIAL_SUM, SERIAL_SUM);
     }
 
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
index aac5447..b75c074 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
@@ -23,7 +23,7 @@
     FLOAT(11),
     DOUBLE(12),
     STRING(13),
-    NULL(14),
+    NULL(14),    
     BOOLEAN(15),
     DATETIME(16),
     DATE(17),
@@ -42,7 +42,8 @@
     LINE(30),
     POLYGON(31),
     CIRCLE(32),
-    RECTANGLE(33);
+    RECTANGLE(33),
+    SYSTEM_NULL(34);
 
     private byte value;
 
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
new file mode 100644
index 0000000..d080b27
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableLocalSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+    
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "local-sum-serial", 1);
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableLocalSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new SerializableSumAggregateFunction(args, true);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
index e689ab4..b97e35b 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
@@ -1,40 +1,14 @@
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
@@ -55,182 +29,12 @@
     public ICopySerializableAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
             throws AlgebricksException {
         return new ICopySerializableAggregateFunctionFactory() {
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = 1L;           
 
             @Override
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
-                return new ICopySerializableAggregateFunction() {
-
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableFloat aFloat = new AMutableFloat(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-                    private AMutableInt32 aInt32 = new AMutableInt32(0);
-                    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-                    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
-                    @SuppressWarnings("rawtypes")
-                    private ISerializerDeserializer serde;
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeBoolean(false);
-                            state.writeBoolean(false);
-                            state.writeBoolean(false);
-                            state.writeBoolean(false);
-                            state.writeBoolean(false);
-                            state.writeBoolean(false);
-                            state.writeBoolean(false);
-                            state.writeDouble(0.0);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        int pos = start;
-                        boolean metInt8s = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metInt16s = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metInt32s = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metInt64s = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
-                        double sum = BufferSerDeUtil.getDouble(state, pos);
-
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        if (inputVal.getLength() > 0) {
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
-                                    .getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    metInt8s = true;
-                                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT16: {
-                                    metInt16s = true;
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT32: {
-                                    metInt32s = true;
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT64: {
-                                    metInt64s = true;
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    metFloats = true;
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    metDoubles = true;
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute SUM for values of type "
-                                            + typeTag);
-                                }
-                            }
-                        }
-
-                        pos = start;
-                        BufferSerDeUtil.writeBoolean(metInt8s, state, pos++);
-                        BufferSerDeUtil.writeBoolean(metInt16s, state, pos++);
-                        BufferSerDeUtil.writeBoolean(metInt32s, state, pos++);
-                        BufferSerDeUtil.writeBoolean(metInt64s, state, pos++);
-                        BufferSerDeUtil.writeBoolean(metFloats, state, pos++);
-                        BufferSerDeUtil.writeBoolean(metDoubles, state, pos++);
-                        BufferSerDeUtil.writeBoolean(metNull, state, pos++);
-                        BufferSerDeUtil.writeDouble(sum, state, pos);
-                    }
-
-                    @SuppressWarnings("unchecked")
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
-                        int pos = start;
-                        boolean metInt8s = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metInt16s = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metInt32s = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metInt64s = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
-                        double sum = BufferSerDeUtil.getDouble(state, pos);
-                        try {
-                            if (metNull) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.ANULL);
-                                serde.serialize(ANull.NULL, out);
-                            } else if (metDoubles) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                                aDouble.setValue(sum);
-                                serde.serialize(aDouble, out);
-                            } else if (metFloats) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AFLOAT);
-                                aFloat.setValue((float) sum);
-                                serde.serialize(aFloat, out);
-                            } else if (metInt64s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT64);
-                                aInt64.setValue((long) sum);
-                                serde.serialize(aInt64, out);
-                            } else if (metInt32s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT32);
-                                aInt32.setValue((int) sum);
-                                serde.serialize(aInt32, out);
-                            } else if (metInt16s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT16);
-                                aInt16.setValue((short) sum);
-                                serde.serialize(aInt16, out);
-                            } else if (metInt8s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT8);
-                                aInt8.setValue((byte) sum);
-                                serde.serialize(aInt8, out);
-                            } else {
-                                GlobalConfig.ASTERIX_LOGGER.fine("SUM aggregate ran over empty input.");
-                            }
-
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] state, int start, int len, DataOutput out)
-                            throws AlgebricksException {
-                        finish(state, start, len, out);
-                    }
-                };
+                return new SerializableSumAggregateFunction(args, false);
             }
         };
     }
-
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
new file mode 100644
index 0000000..9831c28
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -0,0 +1,229 @@
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableSumAggregateFunction implements ICopySerializableAggregateFunction {
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableFloat aFloat = new AMutableFloat(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+    private AMutableInt32 aInt32 = new AMutableInt32(0);
+    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+    @SuppressWarnings("rawtypes")
+    private ISerializerDeserializer serde;
+    private final boolean isLocalAgg = false;
+    
+    public SerializableSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
+            throws AlgebricksException {
+        eval = args[0].createEvaluator(inputVal);
+    }
+    
+    @Override
+    public void init(DataOutput state) throws AlgebricksException {
+        try {
+            state.writeBoolean(false);
+            state.writeBoolean(false);
+            state.writeBoolean(false);
+            state.writeBoolean(false);
+            state.writeBoolean(false);
+            state.writeBoolean(false);
+            state.writeBoolean(false);
+            state.writeBoolean(false);
+            state.writeDouble(0.0);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
+            throws AlgebricksException {
+        int pos = start;
+        boolean metInt8s = BufferSerDeUtil.getBoolean(state, pos++);
+        boolean metInt16s = BufferSerDeUtil.getBoolean(state, pos++);
+        boolean metInt32s = BufferSerDeUtil.getBoolean(state, pos++);
+        boolean metInt64s = BufferSerDeUtil.getBoolean(state, pos++);
+        boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
+        boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
+        boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
+        boolean metSystemNull = BufferSerDeUtil.getBoolean(state, pos++);
+        double sum = BufferSerDeUtil.getDouble(state, pos);
+
+        inputVal.reset();
+        eval.evaluate(tuple);
+        if (inputVal.getLength() > 0) {
+            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
+                    .getByteArray()[0]);
+            switch (typeTag) {
+                case INT8: {
+                    metInt8s = true;
+                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case INT16: {
+                    metInt16s = true;
+                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case INT32: {
+                    metInt32s = true;
+                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case INT64: {
+                    metInt64s = true;
+                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case FLOAT: {
+                    metFloats = true;
+                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case DOUBLE: {
+                    metDoubles = true;
+                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case NULL: {
+                    metNull = true;
+                    break;
+                }
+                case SYSTEM_NULL: {
+                    metSystemNull = true;
+                    // For global aggregates simply ignore system null here,
+                    // but if all input value are system null, then we should return
+                    // null in finish().
+                    if (isLocalAgg) {
+                        throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+                    }
+                    break;
+                }
+                default: {
+                    throw new NotImplementedException("Cannot compute SUM for values of type "
+                            + typeTag);
+                }
+            }
+        }
+
+        pos = start;
+        BufferSerDeUtil.writeBoolean(metInt8s, state, pos++);
+        BufferSerDeUtil.writeBoolean(metInt16s, state, pos++);
+        BufferSerDeUtil.writeBoolean(metInt32s, state, pos++);
+        BufferSerDeUtil.writeBoolean(metInt64s, state, pos++);
+        BufferSerDeUtil.writeBoolean(metFloats, state, pos++);
+        BufferSerDeUtil.writeBoolean(metDoubles, state, pos++);
+        BufferSerDeUtil.writeBoolean(metNull, state, pos++);
+        BufferSerDeUtil.writeBoolean(metSystemNull, state, pos++);
+        BufferSerDeUtil.writeDouble(sum, state, pos);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+        int pos = start;
+        boolean metInt8s = BufferSerDeUtil.getBoolean(state, pos++);
+        boolean metInt16s = BufferSerDeUtil.getBoolean(state, pos++);
+        boolean metInt32s = BufferSerDeUtil.getBoolean(state, pos++);
+        boolean metInt64s = BufferSerDeUtil.getBoolean(state, pos++);
+        boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
+        boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
+        boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
+        boolean metSystemNull = BufferSerDeUtil.getBoolean(state, pos++);
+        double sum = BufferSerDeUtil.getDouble(state, pos);
+        try {
+            if (metNull) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(BuiltinType.ANULL);
+                serde.serialize(ANull.NULL, out);
+            } else if (metDoubles) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(BuiltinType.ADOUBLE);
+                aDouble.setValue(sum);
+                serde.serialize(aDouble, out);
+            } else if (metFloats) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(BuiltinType.AFLOAT);
+                aFloat.setValue((float) sum);
+                serde.serialize(aFloat, out);
+            } else if (metInt64s) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(BuiltinType.AINT64);
+                aInt64.setValue((long) sum);
+                serde.serialize(aInt64, out);
+            } else if (metInt32s) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(BuiltinType.AINT32);
+                aInt32.setValue((int) sum);
+                serde.serialize(aInt32, out);
+            } else if (metInt16s) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(BuiltinType.AINT16);
+                aInt16.setValue((short) sum);
+                serde.serialize(aInt16, out);
+            } else if (metInt8s) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(BuiltinType.AINT8);
+                aInt8.setValue((byte) sum);
+                serde.serialize(aInt8, out);
+            } else if (metSystemNull) {
+                // All input values must have been of type system null.
+                serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                serde.serialize(ANull.NULL, out);
+            } else {
+                // Empty stream. For local agg return system null. For global agg return null.
+                if (isLocalAgg) {
+                    out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                } else {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                    serde.serialize(ANull.NULL, out);
+                }
+            }
+
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput out)
+            throws AlgebricksException {
+        finish(state, start, len, out);
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
index ecdd194..4b43a0b 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
@@ -39,6 +39,7 @@
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ByteArrayAccessibleOutputStream;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
@@ -168,6 +169,11 @@
                     public void finish() throws AlgebricksException {
                         if (count == 0) {
                             GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
+                            try {
+								nullSerde.serialize(ANull.NULL, out);
+							} catch (HyracksDataException e) {
+								throw new AlgebricksException(e);
+							}
                         } else {
                             try {
                                 if (metNull)
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
new file mode 100644
index 0000000..b2f3e57
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+
+public class LocalSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sum",
+            1);
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SumAggregateFunction(args, provider, true);
+            };
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateDescriptor.java
index 5e45432..ad2880a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateDescriptor.java
@@ -1,41 +1,15 @@
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
 
@@ -52,7 +26,6 @@
         return FID;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
             throws AlgebricksException {
@@ -62,145 +35,8 @@
             @Override
             public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
                     throws AlgebricksException {
-
-                return new ICopyAggregateFunction() {
-
-                    private DataOutput out = provider.getDataOutput();
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-                    private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
-                    private double sum;
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableFloat aFloat = new AMutableFloat(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-                    private AMutableInt32 aInt32 = new AMutableInt32(0);
-                    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-                    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
-                    @SuppressWarnings("rawtypes")
-                    private ISerializerDeserializer serde;
-
-                    @Override
-                    public void init() {
-                        metInt8s = false;
-                        metInt16s = false;
-                        metInt32s = false;
-                        metInt64s = false;
-                        metFloats = false;
-                        metDoubles = false;
-                        metNull = false;
-                        sum = 0.0;
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        if (inputVal.getLength() > 0) {
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
-                                    .getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    metInt8s = true;
-                                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT16: {
-                                    metInt16s = true;
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT32: {
-                                    metInt32s = true;
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT64: {
-                                    metInt64s = true;
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    metFloats = true;
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    metDoubles = true;
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute SUM for values of type "
-                                            + typeTag);
-                                }
-                            }
-                        }
-                    }
-
-                    @Override
-                    public void finish() throws AlgebricksException {
-                        try {
-                            if (metNull) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.ANULL);
-                                serde.serialize(ANull.NULL, out);
-                            } else if (metDoubles) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                                aDouble.setValue(sum);
-                                serde.serialize(aDouble, out);
-                            } else if (metFloats) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AFLOAT);
-                                aFloat.setValue((float) sum);
-                                serde.serialize(aFloat, out);
-                            } else if (metInt64s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT64);
-                                aInt64.setValue((long) sum);
-                                serde.serialize(aInt64, out);
-                            } else if (metInt32s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT32);
-                                aInt32.setValue((int) sum);
-                                serde.serialize(aInt32, out);
-                            } else if (metInt16s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT16);
-                                aInt16.setValue((short) sum);
-                                serde.serialize(aInt16, out);
-                            } else if (metInt8s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT8);
-                                aInt8.setValue((byte) sum);
-                                serde.serialize(aInt8, out);
-                            } else {
-                                GlobalConfig.ASTERIX_LOGGER.fine("SUM aggregate ran over empty input.");
-                            }
-
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-
-                    }
-
-                    @Override
-                    public void finishPartial() throws AlgebricksException {
-                        finish();
-                    }
-                };
-            }
+                return new SumAggregateFunction(args, provider, false);
+            };
         };
     }
-
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
new file mode 100644
index 0000000..282d7b3
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
@@ -0,0 +1,187 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SumAggregateFunction implements ICopyAggregateFunction {
+    private DataOutput out;
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull, metSystemNull;
+    private double sum;
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableFloat aFloat = new AMutableFloat(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+    private AMutableInt32 aInt32 = new AMutableInt32(0);
+    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+    @SuppressWarnings("rawtypes")
+    private ISerializerDeserializer serde;
+
+    private final boolean isLocalAgg;
+
+    public SumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
+            throws AlgebricksException {
+        out = provider.getDataOutput();
+        eval = args[0].createEvaluator(inputVal);
+        this.isLocalAgg = isLocalAgg;
+    }
+
+    @Override
+    public void init() {
+        metInt8s = false;
+        metInt16s = false;
+        metInt32s = false;
+        metInt64s = false;
+        metFloats = false;
+        metDoubles = false;
+        metNull = false;
+        metSystemNull = false;
+        sum = 0.0;
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        inputVal.reset();
+        eval.evaluate(tuple);
+        if (inputVal.getLength() > 0) {
+            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+            switch (typeTag) {
+                case INT8: {
+                    metInt8s = true;
+                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case INT16: {
+                    metInt16s = true;
+                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case INT32: {
+                    metInt32s = true;
+                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case INT64: {
+                    metInt64s = true;
+                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case FLOAT: {
+                    metFloats = true;
+                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case DOUBLE: {
+                    metDoubles = true;
+                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                    sum += val;
+                    break;
+                }
+                case NULL: {
+                    metNull = true;
+                    break;
+                }
+                case SYSTEM_NULL: {
+                    metSystemNull = true;
+                    // For global aggregates simply ignore system null here,
+                    // but if all input value are system null, then we should return
+                    // null in finish().
+                    if (isLocalAgg) {
+                        throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+                    }
+                    break;
+                }
+                default: {
+                    throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag);
+                }
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            if (metNull) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                serde.serialize(ANull.NULL, out);
+            } else if (metDoubles) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+                aDouble.setValue(sum);
+                serde.serialize(aDouble, out);
+            } else if (metFloats) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
+                aFloat.setValue((float) sum);
+                serde.serialize(aFloat, out);
+            } else if (metInt64s) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+                aInt64.setValue((long) sum);
+                serde.serialize(aInt64, out);
+            } else if (metInt32s) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+                aInt32.setValue((int) sum);
+                serde.serialize(aInt32, out);
+            } else if (metInt16s) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
+                aInt16.setValue((short) sum);
+                serde.serialize(aInt16, out);
+            } else if (metInt8s) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
+                aInt8.setValue((byte) sum);
+                serde.serialize(aInt8, out);
+            } else if (metSystemNull) {
+                // All input values must have been of type system null.
+                serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                serde.serialize(ANull.NULL, out);
+            } else {
+                // Empty stream. For local agg return system null. For global agg return null.
+                if (isLocalAgg) {
+                    out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                } else {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                    serde.serialize(ANull.NULL, out);
+                }
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index e9f1ac5..e17a4dd 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -45,11 +45,13 @@
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableCountAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableGlobalAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.AvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.CountAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.GlobalAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.MaxAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.MinAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.SumAggregateDescriptor;
@@ -300,6 +302,7 @@
         temp.add(LocalAvgAggregateDescriptor.FACTORY);
         temp.add(GlobalAvgAggregateDescriptor.FACTORY);
         temp.add(SumAggregateDescriptor.FACTORY);
+        temp.add(LocalSumAggregateDescriptor.FACTORY);
         temp.add(MaxAggregateDescriptor.FACTORY);
         temp.add(MinAggregateDescriptor.FACTORY);
 
@@ -309,6 +312,7 @@
         temp.add(SerializableLocalAvgAggregateDescriptor.FACTORY);
         temp.add(SerializableGlobalAvgAggregateDescriptor.FACTORY);
         temp.add(SerializableSumAggregateDescriptor.FACTORY);
+        temp.add(SerializableLocalSumAggregateDescriptor.FACTORY);
 
         // new functions - constructors
         temp.add(ABooleanConstructorDescriptor.FACTORY);