Reintegrated asterix_fix_agg. Adds scalar aggregates, fixes existential and universal quantification, and corrects behavior of aggregate functions for empty inputs. Various other minor bug fixes. Fixes issues 136, 138, 139.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@654 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index 754067a..0f28239 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.asterix.optimizer.rules.LoadRecordFieldsRule;
 import edu.uci.ics.asterix.optimizer.rules.NestGroupByRule;
 import edu.uci.ics.asterix.optimizer.rules.PullPositionalVariableFromUnnestRule;
+import edu.uci.ics.asterix.optimizer.rules.PushAggFuncIntoStandaloneAggregateRule;
 import edu.uci.ics.asterix.optimizer.rules.PushAggregateIntoGroupbyRule;
 import edu.uci.ics.asterix.optimizer.rules.PushFieldAccessRule;
 import edu.uci.ics.asterix.optimizer.rules.PushGroupByThroughProduct;
@@ -63,8 +64,8 @@
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.InsertProjectBeforeUnionRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroHashPartitionMergeExchange;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroJoinInsideSubplanRule;
-import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceCombinerRule;
-import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByForStandaloneAggregRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceAggregateCombinerRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByCombinerRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByForSubplanRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IsolateHyracksOperatorsRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PullSelectOutOfEqJoin;
@@ -94,7 +95,7 @@
     public final static List<IAlgebraicRewriteRule> buildNormalizationRuleCollection() {
         List<IAlgebraicRewriteRule> normalization = new LinkedList<IAlgebraicRewriteRule>();
         normalization.add(new EliminateSubplanRule());
-        normalization.add(new IntroduceGroupByForStandaloneAggregRule());
+        normalization.add(new PushAggFuncIntoStandaloneAggregateRule());
         normalization.add(new BreakSelectIntoConjunctsRule());
         normalization.add(new ExtractGbyExpressionsRule());
         normalization.add(new ExtractDistinctByExpressionsRule());
@@ -168,7 +169,8 @@
         consolidation.add(new ConsolidateSelectsRule());
         consolidation.add(new ConsolidateAssignsRule());
         consolidation.add(new InlineAssignIntoAggregateRule());
-        consolidation.add(new IntroduceCombinerRule());
+        consolidation.add(new IntroduceGroupByCombinerRule());
+        consolidation.add(new IntroduceAggregateCombinerRule());
         consolidation.add(new CountVarToCountOneRule());
         consolidation.add(new IntroduceSelectAccessMethodRule());
         consolidation.add(new IntroduceJoinAccessMethodRule());
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java
new file mode 100644
index 0000000..c5a1cb0
--- /dev/null
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Pushes aggregate functions into a stand alone aggregate operator (no group by). 
+ */
+public class PushAggFuncIntoStandaloneAggregateRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        // Pattern to match: assign <-- aggregate <-- !(group-by)
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+            return false;
+        }
+        Mutable<ILogicalOperator> opRef2 = op.getInputs().get(0);
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
+        if (op2.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+            return false;
+        }
+        // If there's a group by below the agg, then we want to have the agg pushed into the group by.
+        Mutable<ILogicalOperator> opRef3 = op2.getInputs().get(0);
+        AbstractLogicalOperator op3 = (AbstractLogicalOperator) opRef3.getValue();
+        if (op3.getOperatorTag() == LogicalOperatorTag.GROUP) {
+            return false;
+        }
+        
+        AssignOperator assignOp = (AssignOperator) op;
+        AggregateOperator aggOp = (AggregateOperator) op2;
+        if (aggOp.getVariables().size() != 1) {
+            return false;
+        }
+
+        // Make sure the agg expr is a listify.
+        ILogicalExpression aggExpr = aggOp.getExpressions().get(0).getValue();
+        if (aggExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            return false;
+        }
+        AbstractFunctionCallExpression origAggFuncExpr = (AbstractFunctionCallExpression) aggExpr;
+        if (origAggFuncExpr.getFunctionIdentifier() != AsterixBuiltinFunctions.LISTIFY) {
+            return false;
+        }
+        
+        LogicalVariable aggVar = aggOp.getVariables().get(0);
+        List<LogicalVariable> used = new LinkedList<LogicalVariable>();
+        VariableUtilities.getUsedVariables(assignOp, used);
+        if (!used.contains(aggVar)) {
+            return false;
+        }
+        
+        Mutable<ILogicalExpression> srcAssignExprRef = fingAggFuncExprRef(assignOp.getExpressions(), aggVar);
+        if (srcAssignExprRef == null) {
+        	return false;
+        }
+        AbstractFunctionCallExpression assignFuncExpr = (AbstractFunctionCallExpression) srcAssignExprRef.getValue();
+        FunctionIdentifier aggFuncIdent = AsterixBuiltinFunctions.getAggregateFunction(assignFuncExpr.getFunctionIdentifier());
+        
+        // Push the agg func into the agg op.                
+        AbstractFunctionCallExpression aggOpExpr = (AbstractFunctionCallExpression) aggOp.getExpressions().get(0).getValue();
+        List<Mutable<ILogicalExpression>> aggArgs = new ArrayList<Mutable<ILogicalExpression>>();
+        aggArgs.add(aggOpExpr.getArguments().get(0));
+        AggregateFunctionCallExpression aggFuncExpr = AsterixBuiltinFunctions.makeAggregateFunctionExpression(aggFuncIdent, aggArgs);
+        aggOp.getExpressions().get(0).setValue(aggFuncExpr);
+        
+        // The assign now just "renames" the variable to make sure the upstream plan still works.
+        srcAssignExprRef.setValue(new VariableReferenceExpression(aggVar));
+
+        // Create a new assign for a TRUE variable.
+        LogicalVariable trueVar = context.newVar();
+        AssignOperator trueAssignOp = new AssignOperator(trueVar, new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
+        
+        ILogicalOperator aggInput = aggOp.getInputs().get(0).getValue();
+        aggOp.getInputs().get(0).setValue(trueAssignOp);
+        trueAssignOp.getInputs().add(new MutableObject<ILogicalOperator>(aggInput));
+        
+        // Set partitioning variable.
+        aggOp.setPartitioningVariable(trueVar);
+        
+        context.computeAndSetTypeEnvironmentForOperator(trueAssignOp);
+        context.computeAndSetTypeEnvironmentForOperator(aggOp);
+        context.computeAndSetTypeEnvironmentForOperator(assignOp);
+        
+        return true;
+    }
+    
+    private Mutable<ILogicalExpression> fingAggFuncExprRef(List<Mutable<ILogicalExpression>> exprRefs, LogicalVariable aggVar) {
+    	for (Mutable<ILogicalExpression> exprRef : exprRefs) {
+            ILogicalExpression expr = exprRef.getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+                continue;
+            }
+            AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+            FunctionIdentifier funcIdent = AsterixBuiltinFunctions.getAggregateFunction(funcExpr.getFunctionIdentifier());
+            if (funcIdent == null) {
+            	// Recursively look in func args.
+            	return fingAggFuncExprRef(funcExpr.getArguments(), aggVar);
+            }
+            // Check if this is the expr that uses aggVar.
+            Collection<LogicalVariable> usedVars = new HashSet<LogicalVariable>();
+            funcExpr.getUsedVariables(usedVars);
+            if (usedVars.contains(aggVar)) {
+            	return exprRef;
+            }
+    	}
+    	return null;
+    }
+}
diff --git a/asterix-app/src/test/resources/optimizerts/results/unnest-to-join_02.plan b/asterix-app/src/test/resources/optimizerts/results/unnest-to-join_02.plan
index bfb15c7..1f56c4f 100644
--- a/asterix-app/src/test/resources/optimizerts/results/unnest-to-join_02.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/unnest-to-join_02.plan
@@ -1,21 +1,11 @@
 -- SINK_WRITE  |UNPARTITIONED|
   -- STREAM_PROJECT  |UNPARTITIONED|
-    -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-      -- PRE_CLUSTERED_GROUP_BY[$$8]  |UNPARTITIONED|
-              {
-                -- AGGREGATE  |UNPARTITIONED|
-                  -- NESTED_TUPLE_SOURCE  |UNPARTITIONED|
-              }
-        -- ONE_TO_ONE_EXCHANGE  |LOCAL|
-          -- STABLE_SORT [$$8(ASC)]  |LOCAL|
-            -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-              -- STREAM_PROJECT  |UNPARTITIONED|
-                -- ASSIGN  |UNPARTITIONED|
-                  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-                    -- HYBRID_HASH_JOIN [$$0][$$1]  |UNPARTITIONED|
-                      -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-                        -- UNNEST  |UNPARTITIONED|
-                          -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                      -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-                        -- UNNEST  |UNPARTITIONED|
-                          -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+    -- AGGREGATE  |UNPARTITIONED|
+      -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+        -- HYBRID_HASH_JOIN [$$0][$$1]  |UNPARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+            -- UNNEST  |UNPARTITIONED|
+              -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+            -- UNNEST  |UNPARTITIONED|
+              -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
diff --git a/asterix-app/src/test/resources/runtimets/ignore.txt b/asterix-app/src/test/resources/runtimets/ignore.txt
index f5ae7fa..f3aef16 100644
--- a/asterix-app/src/test/resources/runtimets/ignore.txt
+++ b/asterix-app/src/test/resources/runtimets/ignore.txt
@@ -4,7 +4,6 @@
 fuzzyjoin/dblp-splits-3_1.aql
 fuzzyjoin/events-users-aqlplus_1.aql
 subset-collection/04.aql
-quantifiers/everysat_01.aql
 custord/freq-clerk.aql
 custord/denorm-cust-order_01.aql
 custord/denorm-cust-order_03.aql
@@ -32,4 +31,4 @@
 quantifiers/everysat_02.aql
 quantifiers/everysat_03.aql
 flwor
-string/startwith03.aql
+string/startwith03.aql
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_01.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_01.aql
new file mode 100644
index 0000000..ab2a6fb
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_01.aql
@@ -0,0 +1,17 @@
+/*
+ * Description    : Tests that avg aggregation correctly returns null for an empty stream,
+ *                  without an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_avg_empty_01.adm";
+
+avg(
+ for $x in [1, 2, 3]
+ where $x > 10
+ return $x
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_02.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_02.aql
new file mode 100644
index 0000000..3583ce0
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/avg_empty_02.aql
@@ -0,0 +1,23 @@
+/*
+ * Description    : Tests that avg aggregation correctly returns null for an empty stream,
+ *                  with an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TestType as closed {
+  id: int32,
+  val: double
+}
+
+create dataset Test(TestType) partitioned by key id;
+
+write output to nc1:"rttest/aggregate_avg_empty_02.adm";
+
+avg(
+ for $x in dataset('Test')
+ return $x.val
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_01.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_01.aql
new file mode 100644
index 0000000..f03c252
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_01.aql
@@ -0,0 +1,17 @@
+/*
+ * Description    : Tests that count aggregation correctly returns 0 for an empty stream,
+ *                  without an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_count_empty_01.adm";
+
+count(
+ for $x in [1, 2, 3]
+ where $x > 10
+ return $x
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_02.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_02.aql
new file mode 100644
index 0000000..0a6cc8e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/count_empty_02.aql
@@ -0,0 +1,23 @@
+/*
+ * Description    : Tests that count aggregation correctly returns 0 for an empty stream,
+ *                  with an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TestType as closed {
+  id: int32,
+  val: double
+}
+
+create dataset Test(TestType) partitioned by key id;
+
+write output to nc1:"rttest/aggregate_count_empty_02.adm";
+
+count(
+ for $x in dataset('Test')
+ return $x.val
+)
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_01.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_01.aql
new file mode 100644
index 0000000..2464b64
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_01.aql
@@ -0,0 +1,17 @@
+/*
+ * Description    : Tests that max aggregation correctly returns null for an empty stream,
+ *                  without an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_max_empty_01.adm";
+
+max(
+ for $x in [1, 2, 3]
+ where $x > 10
+ return $x
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_02.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_02.aql
new file mode 100644
index 0000000..79ae1d8
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/max_empty_02.aql
@@ -0,0 +1,23 @@
+/*
+ * Description    : Tests that max aggregation correctly returns null for an empty stream,
+ *                  with an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TestType as closed {
+  id: int32,
+  val: double
+}
+
+create dataset Test(TestType) partitioned by key id;
+
+write output to nc1:"rttest/aggregate_max_empty_02.adm";
+
+max(
+ for $x in dataset('Test')
+ return $x.val
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_01.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_01.aql
new file mode 100644
index 0000000..30abd1e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_01.aql
@@ -0,0 +1,17 @@
+/*
+ * Description    : Tests that min aggregation correctly returns null for an empty stream,
+ *                  without an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_min_empty_01.adm";
+
+min(
+ for $x in [1, 2, 3]
+ where $x > 10
+ return $x
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_02.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_02.aql
new file mode 100644
index 0000000..99d49f4
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/min_empty_02.aql
@@ -0,0 +1,23 @@
+/*
+ * Description    : Tests that min aggregation correctly returns null for an empty stream,
+ *                  with an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TestType as closed {
+  id: int32,
+  val: double
+}
+
+create dataset Test(TestType) partitioned by key id;
+
+write output to nc1:"rttest/aggregate_min_empty_02.adm";
+
+min(
+ for $x in dataset('Test')
+ return $x.val
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_avg.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_avg.aql
new file mode 100644
index 0000000..8c8ed73
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_avg.aql
@@ -0,0 +1,19 @@
+/*
+ * Description    : Tests the scalar version of avg without nulls.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_scalar_avg.adm";
+
+let $i8 := avg([int8("1"), int8("2"), int8("3")])
+let $i16 := avg([int16("1"), int16("2"), int16("3")])
+let $i32 := avg([int32("1"), int32("2"), int32("3")])
+let $i64 := avg([int64("1"), int64("2"), int64("3")])
+let $f := avg([float("1"), float("2"), float("3")])
+let $d := avg([double("1"), double("2"), double("3")])
+for $i in [$i8, $i16, $i32, $i64, $f, $d]
+return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_avg_empty.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_avg_empty.aql
new file mode 100644
index 0000000..351f5f8
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_avg_empty.aql
@@ -0,0 +1,12 @@
+/*
+ * Description    : Tests the scalar version of avg with an empty list.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_scalar_avg_empty.adm";
+
+avg([ ])
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_avg_null.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_avg_null.aql
new file mode 100644
index 0000000..5108573
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_avg_null.aql
@@ -0,0 +1,19 @@
+/*
+ * Description    : Tests the scalar version of avg with nulls.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_scalar_avg_null.adm";
+
+let $i8 := avg([int8("1"), int8("2"), int8("3"), null])
+let $i16 := avg([int16("1"), int16("2"), int16("3"), null])
+let $i32 := avg([int32("1"), int32("2"), int32("3"), null])
+let $i64 := avg([int64("1"), int64("2"), int64("3"), null])
+let $f := avg([float("1"), float("2"), float("3"), null])
+let $d := avg([double("1"), double("2"), double("3"), null])
+for $i in [$i8, $i16, $i32, $i64, $f, $d]
+return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_count.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_count.aql
new file mode 100644
index 0000000..3508fc6
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_count.aql
@@ -0,0 +1,20 @@
+/*
+ * Description    : Tests the scalar version of count without nulls.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_scalar_count.adm";
+
+let $i8 := count([int8("1"), int8("2"), int8("3")])
+let $i16 := count([int16("1"), int16("2"), int16("3")])
+let $i32 := count([int32("1"), int32("2"), int32("3")])
+let $i64 := count([int64("1"), int64("2"), int64("3")])
+let $f := count([float("1"), float("2"), float("3")])
+let $d := count([double("1"), double("2"), double("3")])
+let $s := count(["a", "b", "c"])
+for $i in [$i8, $i16, $i32, $i64, $f, $d, $s]
+return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_count_empty.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_count_empty.aql
new file mode 100644
index 0000000..afd3dab
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_count_empty.aql
@@ -0,0 +1,12 @@
+/*
+ * Description    : Tests the scalar version of count with an empty list.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_scalar_count_empty.adm";
+
+count([ ])
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_count_null.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_count_null.aql
new file mode 100644
index 0000000..60dd19f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_count_null.aql
@@ -0,0 +1,20 @@
+/*
+ * Description    : Tests the scalar version of count with nulls.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_scalar_count_null.adm";
+
+let $i8 := count([int8("1"), int8("2"), int8("3"), null])
+let $i16 := count([int16("1"), int16("2"), int16("3"), null])
+let $i32 := count([int32("1"), int32("2"), int32("3"), null])
+let $i64 := count([int64("1"), int64("2"), int64("3"), null])
+let $f := count([float("1"), float("2"), float("3"), null])
+let $d := count([double("1"), double("2"), double("3"), null])
+let $s := count(["a", "b", "c", null])
+for $i in [$i8, $i16, $i32, $i64, $f, $d, $s]
+return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max.aql
new file mode 100644
index 0000000..8eefced
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max.aql
@@ -0,0 +1,21 @@
+/*
+ * Description    : Tests the scalar version of max without nulls.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_scalar_max.adm";
+
+let $i8 := max([int8("1"), int8("2"), int8("3")])
+let $i16 := max([int16("1"), int16("2"), int16("3")])
+let $i32 := max([int32("1"), int32("2"), int32("3")])
+let $i64 := max([int64("1"), int64("2"), int64("3")])
+let $f := max([float("1"), float("2"), float("3")])
+let $d := max([double("1"), double("2"), double("3")])
+let $s := max(["foo", "bar", "world"])
+let $dt := max([datetime("2012-03-01T00:00:00Z"), datetime("2012-01-01T00:00:00Z"), datetime("2012-02-01T00:00:00Z")])
+for $i in [$i8, $i16, $i32, $i64, $f, $d, $s, $dt]
+return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max_empty.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max_empty.aql
new file mode 100644
index 0000000..3a4396e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max_empty.aql
@@ -0,0 +1,12 @@
+/*
+ * Description    : Tests the scalar version of max with an empty list.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_scalar_max_empty.adm";
+
+max([ ])
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max_null.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max_null.aql
new file mode 100644
index 0000000..04436bf
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_max_null.aql
@@ -0,0 +1,21 @@
+/*
+ * Description    : Tests the scalar version of max with nulls.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_scalar_max_null.adm";
+
+let $i8 := max([int8("1"), int8("2"), int8("3"), null])
+let $i16 := max([int16("1"), int16("2"), int16("3"), null])
+let $i32 := max([int32("1"), int32("2"), int32("3"), null])
+let $i64 := max([int64("1"), int64("2"), int64("3"), null])
+let $f := max([float("1"), float("2"), float("3"), null])
+let $d := max([double("1"), double("2"), double("3"), null])
+let $s := max(["foo", "bar", "world", null])
+let $dt := min([datetime("2012-03-01T00:00:00Z"), datetime("2012-01-01T00:00:00Z"), datetime("2012-02-01T00:00:00Z"), null])
+for $i in [$i8, $i16, $i32, $i64, $f, $d, $s, $dt]
+return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min.aql
new file mode 100644
index 0000000..04aa735
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min.aql
@@ -0,0 +1,21 @@
+/*
+ * Description    : Tests the scalar version of min without nulls.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_scalar_min.adm";
+
+let $i8 := min([int8("1"), int8("2"), int8("3")])
+let $i16 := min([int16("1"), int16("2"), int16("3")])
+let $i32 := min([int32("1"), int32("2"), int32("3")])
+let $i64 := min([int64("1"), int64("2"), int64("3")])
+let $f := min([float("1"), float("2"), float("3")])
+let $d := min([double("1"), double("2"), double("3")])
+let $s := min(["foo", "bar", "world"])
+let $dt := min([datetime("2012-03-01T00:00:00Z"), datetime("2012-01-01T00:00:00Z"), datetime("2012-02-01T00:00:00Z")])
+for $i in [$i8, $i16, $i32, $i64, $f, $d, $s, $dt]
+return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min_empty.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min_empty.aql
new file mode 100644
index 0000000..e242a35
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min_empty.aql
@@ -0,0 +1,12 @@
+/*
+ * Description    : Tests the scalar version of min with an empty list.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_scalar_min_empty.adm";
+
+min([ ])
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min_null.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min_null.aql
new file mode 100644
index 0000000..523ca26
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_min_null.aql
@@ -0,0 +1,21 @@
+/*
+ * Description    : Tests the scalar version of min with nulls.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_scalar_min_null.adm";
+
+let $i8 := min([int8("1"), int8("2"), int8("3"), null])
+let $i16 := min([int16("1"), int16("2"), int16("3"), null])
+let $i32 := min([int32("1"), int32("2"), int32("3"), null])
+let $i64 := min([int64("1"), int64("2"), int64("3"), null])
+let $f := min([float("1"), float("2"), float("3"), null])
+let $d := min([double("1"), double("2"), double("3"), null])
+let $s := min(["foo", "bar", "world", null])
+let $dt := min([datetime("2012-03-01T00:00:00Z"), datetime("2012-01-01T00:00:00Z"), datetime("2012-02-01T00:00:00Z"), null])
+for $i in [$i8, $i16, $i32, $i64, $f, $d, $s, $dt]
+return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_sum.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_sum.aql
new file mode 100644
index 0000000..843360f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_sum.aql
@@ -0,0 +1,19 @@
+/*
+ * Description    : Tests the scalar version of sum without nulls.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_scalar_sum.adm";
+
+let $i8 := sum([int8("1"), int8("2"), int8("3")])
+let $i16 := sum([int16("1"), int16("2"), int16("3")])
+let $i32 := sum([int32("1"), int32("2"), int32("3")])
+let $i64 := sum([int64("1"), int64("2"), int64("3")])
+let $f := sum([float("1"), float("2"), float("3")])
+let $d := sum([double("1"), double("2"), double("3")])
+for $i in [$i8, $i16, $i32, $i64, $f, $d]
+return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_sum_empty.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_sum_empty.aql
new file mode 100644
index 0000000..35bf676
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_sum_empty.aql
@@ -0,0 +1,12 @@
+/*
+ * Description    : Tests the scalar version of sum with an empty list.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_scalar_sum_empty.adm";
+
+sum([ ])
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_sum_null.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_sum_null.aql
new file mode 100644
index 0000000..a4c2e61
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/scalar_sum_null.aql
@@ -0,0 +1,19 @@
+/*
+ * Description    : Tests the scalar version of sum with nulls.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_scalar_sum_null.adm";
+
+let $i8 := sum([int8("1"), int8("2"), int8("3"), null])
+let $i16 := sum([int16("1"), int16("2"), int16("3"), null])
+let $i32 := sum([int32("1"), int32("2"), int32("3"), null])
+let $i64 := sum([int64("1"), int64("2"), int64("3"), null])
+let $f := sum([float("1"), float("2"), float("3"), null])
+let $d := sum([double("1"), double("2"), double("3"), null])
+for $i in [$i8, $i16, $i32, $i64, $f, $d]
+return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_01.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_01.aql
new file mode 100644
index 0000000..b4e26b8
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_01.aql
@@ -0,0 +1,17 @@
+/*
+ * Description    : Tests that sum aggregation correctly returns null for an empty stream,
+ *                  without an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/aggregate_sum_empty_01.adm";
+
+sum(
+ for $x in [1, 2, 3]
+ where $x > 10
+ return $x
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_02.aql b/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_02.aql
new file mode 100644
index 0000000..a94457a
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/aggregate/sum_empty_02.aql
@@ -0,0 +1,23 @@
+/*
+ * Description    : Tests that sum aggregation correctly returns null for an empty stream,
+ *                  with an aggregate combiner.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TestType as closed {
+  id: int32,
+  val: double
+}
+
+create dataset Test(TestType) partitioned by key id;
+
+write output to nc1:"rttest/aggregate_sum_empty_02.adm";
+
+sum(
+ for $x in dataset('Test')
+ return $x.val
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/list/ordered-list-constructor_03.aql b/asterix-app/src/test/resources/runtimets/queries/list/ordered-list-constructor_03.aql
new file mode 100644
index 0000000..81eaf8b
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/list/ordered-list-constructor_03.aql
@@ -0,0 +1,7 @@
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/list_ordered-list-constructor_03.adm";
+
+[ null, null, null ]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/list/unordered-list-constructor_03.aql b/asterix-app/src/test/resources/runtimets/queries/list/unordered-list-constructor_03.aql
new file mode 100644
index 0000000..e8b119e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/list/unordered-list-constructor_03.aql
@@ -0,0 +1,7 @@
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/list_unordered-list-constructor_03.adm";
+
+{{ null, null, null }}
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/quantifiers/everysat_04.aql b/asterix-app/src/test/resources/runtimets/queries/quantifiers/everysat_04.aql
new file mode 100644
index 0000000..b9eccfd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/quantifiers/everysat_04.aql
@@ -0,0 +1,23 @@
+/*
+ * Description    : Tests that universal quantification returns true/false correctly.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/quantifiers_everysat_04.adm";
+
+let $x := [
+every $x in [false,false] satisfies $x,
+every $x in [true,false] satisfies $x,
+every $x in [false,true] satisfies $x,
+every $x in [true,true] satisfies $x,
+every $x in [false,false] satisfies not($x),
+every $x in [true,false] satisfies not($x),
+every $x in [false,true] satisfies not($x),
+every $x in [true,true] satisfies not($x)
+]
+for $i in $x
+return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/quantifiers/somesat_06.aql b/asterix-app/src/test/resources/runtimets/queries/quantifiers/somesat_06.aql
new file mode 100644
index 0000000..6e8892c
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/quantifiers/somesat_06.aql
@@ -0,0 +1,23 @@
+/*
+ * Description    : Tests that existential quantification returns true/false correctly.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+write output to nc1:"rttest/quantifiers_somesat_06.adm";
+
+let $x := [
+some $x in [false,false] satisfies $x,
+some $x in [true,false] satisfies $x,
+some $x in [false,true] satisfies $x,
+some $x in [true,true] satisfies $x,
+some $x in [false,false] satisfies not($x),
+some $x in [true,false] satisfies not($x),
+some $x in [false,true] satisfies not($x),
+some $x in [true,true] satisfies not($x)
+]
+for $i in $x
+return $i
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/tpch/q1_pricing_summary_report_nt.aql b/asterix-app/src/test/resources/runtimets/queries/tpch/q1_pricing_summary_report_nt.aql
index b53fc877..af39b3f 100644
--- a/asterix-app/src/test/resources/runtimets/queries/tpch/q1_pricing_summary_report_nt.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/tpch/q1_pricing_summary_report_nt.aql
@@ -1,7 +1,5 @@
 drop dataverse tpch if exists;
 create dataverse tpch;
-  
-
 use dataverse tpch;
 
 create type LineItemType as closed {
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_01.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_01.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_01.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_02.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_02.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/avg_empty_02.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_01.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_01.adm
new file mode 100644
index 0000000..573541a
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_01.adm
@@ -0,0 +1 @@
+0
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_02.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_02.adm
new file mode 100644
index 0000000..573541a
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/count_empty_02.adm
@@ -0,0 +1 @@
+0
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/count_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/count_null.adm
index df462fe..51d5f4f 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/count_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/count_null.adm
@@ -1 +1 @@
-{ "count": 2 }
\ No newline at end of file
+{ "count": null }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_double_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_double_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_double_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_double_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_float_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_float_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_float_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_float_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int16_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int16_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int16_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int16_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int32_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int32_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int32_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int32_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int64_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int64_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int64_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int64_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int8_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int8_null.adm
index 8649548..b11c820 100644
--- a/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int8_null.adm
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/local-avg_int8_null.adm
@@ -1 +1 @@
-{ "sum": null, "count": 2 }
\ No newline at end of file
+{ "sum": null, "count": 1 }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_01.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_01.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_01.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_02.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_02.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/max_empty_02.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_01.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_01.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_01.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_02.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_02.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/min_empty_02.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_avg.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_avg.adm
new file mode 100644
index 0000000..483cb2f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_avg.adm
@@ -0,0 +1,6 @@
+2.0
+2.0
+2.0
+2.0
+2.0
+2.0
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_avg_empty.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_avg_empty.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_avg_empty.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_avg_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_avg_null.adm
new file mode 100644
index 0000000..0800a91
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_avg_null.adm
@@ -0,0 +1,6 @@
+null
+null
+null
+null
+null
+null
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_count.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_count.adm
new file mode 100644
index 0000000..80f0b99
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_count.adm
@@ -0,0 +1,7 @@
+3
+3
+3
+3
+3
+3
+3
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_count_empty.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_count_empty.adm
new file mode 100644
index 0000000..573541a
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_count_empty.adm
@@ -0,0 +1 @@
+0
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_count_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_count_null.adm
new file mode 100644
index 0000000..1abbc3f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_count_null.adm
@@ -0,0 +1,7 @@
+null
+null
+null
+null
+null
+null
+null
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max.adm
new file mode 100644
index 0000000..d420c2f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max.adm
@@ -0,0 +1,8 @@
+3i8
+3i16
+3
+3i64
+3.0f
+3.0d
+"world"
+datetime("2012-03-01T00:00:00.000Z")
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max_empty.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max_empty.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max_empty.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max_null.adm
new file mode 100644
index 0000000..c9f3cb3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_max_null.adm
@@ -0,0 +1,8 @@
+null
+null
+null
+null
+null
+null
+null
+null
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min.adm
new file mode 100644
index 0000000..6acf213
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min.adm
@@ -0,0 +1,8 @@
+1i8
+1i16
+1
+1i64
+1.0f
+1.0d
+"bar"
+datetime("2012-01-01T00:00:00.000Z")
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min_empty.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min_empty.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min_empty.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min_null.adm
new file mode 100644
index 0000000..c9f3cb3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_min_null.adm
@@ -0,0 +1,8 @@
+null
+null
+null
+null
+null
+null
+null
+null
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_sum.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_sum.adm
new file mode 100644
index 0000000..6e7617e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_sum.adm
@@ -0,0 +1,6 @@
+6i8
+6i16
+6
+6i64
+6.0f
+6.0d
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_sum_empty.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_sum_empty.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_sum_empty.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_sum_null.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_sum_null.adm
new file mode 100644
index 0000000..0800a91
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/scalar_sum_null.adm
@@ -0,0 +1,6 @@
+null
+null
+null
+null
+null
+null
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_01.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_01.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_01.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_02.adm b/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_02.adm
new file mode 100644
index 0000000..19765bd
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/aggregate/sum_empty_02.adm
@@ -0,0 +1 @@
+null
diff --git a/asterix-app/src/test/resources/runtimets/results/list/ordered-list-constructor_03.adm b/asterix-app/src/test/resources/runtimets/results/list/ordered-list-constructor_03.adm
new file mode 100644
index 0000000..5c06d0b
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/list/ordered-list-constructor_03.adm
@@ -0,0 +1 @@
+[ null, null, null ]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/list/unordered-list-constructor_03.adm b/asterix-app/src/test/resources/runtimets/results/list/unordered-list-constructor_03.adm
new file mode 100644
index 0000000..8ffe6e4
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/list/unordered-list-constructor_03.adm
@@ -0,0 +1 @@
+{{ null, null, null }}
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_01.adm b/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_01.adm
new file mode 100644
index 0000000..d6cf966
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_01.adm
@@ -0,0 +1 @@
+-30
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_02.adm b/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_02.adm
new file mode 100644
index 0000000..fd6a9d9
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_02.adm
@@ -0,0 +1,12 @@
+false
+true
+false
+true
+false
+false
+true
+false
+false
+false
+false
+false
diff --git a/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_04.adm b/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_04.adm
new file mode 100644
index 0000000..98ce28d
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/quantifiers/everysat_04.adm
@@ -0,0 +1,8 @@
+false
+false
+false
+true
+true
+false
+false
+false
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/quantifiers/somesat_06.adm b/asterix-app/src/test/resources/runtimets/results/quantifiers/somesat_06.adm
new file mode 100644
index 0000000..2cc1766
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/quantifiers/somesat_06.adm
@@ -0,0 +1,8 @@
+false
+true
+true
+true
+true
+true
+true
+false
\ No newline at end of file
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/builders/OrderedListBuilder.java b/asterix-om/src/main/java/edu/uci/ics/asterix/builders/OrderedListBuilder.java
index 18cec15..9fb422d 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/builders/OrderedListBuilder.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/builders/OrderedListBuilder.java
@@ -58,7 +58,7 @@
     public void addItem(IValueReference item) throws HyracksDataException {
         if (!fixedSize)
             this.offsets.add((short) outputStream.size());
-        if (itemTypeTag == ATypeTag.ANY) {
+        if (itemTypeTag == ATypeTag.ANY || (itemTypeTag == ATypeTag.NULL && item.getByteArray()[0] == serNullTypeTag)) {
             this.numberOfItems++;
             this.outputStream.write(item.getByteArray(), item.getStartOffset(), item.getLength());
         } else if (item.getByteArray()[0] != serNullTypeTag) {
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/builders/UnorderedListBuilder.java b/asterix-om/src/main/java/edu/uci/ics/asterix/builders/UnorderedListBuilder.java
index e8dbc29..90fb2b5 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/builders/UnorderedListBuilder.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/builders/UnorderedListBuilder.java
@@ -61,7 +61,7 @@
     public void addItem(IValueReference item) throws HyracksDataException {
         if (!fixedSize)
             this.offsets.add((short) outputStream.size());
-        if (itemTypeTag == ATypeTag.ANY) {
+        if (itemTypeTag == ATypeTag.ANY || (itemTypeTag == ATypeTag.NULL && item.getByteArray()[0] == serNullTypeTag)) {
             this.numberOfItems++;
             this.outputStream.write(item.getByteArray(), item.getStartOffset(), item.getLength());
         } else if (item.getByteArray()[0] != serNullTypeTag) {
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/printers/AOrderedlistPrinterFactory.java b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/printers/AOrderedlistPrinterFactory.java
index 9edb640..563b14c 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/printers/AOrderedlistPrinterFactory.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/printers/AOrderedlistPrinterFactory.java
@@ -34,10 +34,6 @@
             private ATypeTag itemTag;
             private boolean typedItemList = false;
 
-            // private int itemLength;
-            // private int numberOfitems;
-            // private int itemOffset;
-
             @Override
             public void init() throws AlgebricksException {
 
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlBinaryComparatorFactoryProvider.java b/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlBinaryComparatorFactoryProvider.java
index 93167cf..e4db8da 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlBinaryComparatorFactoryProvider.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlBinaryComparatorFactoryProvider.java
@@ -6,7 +6,6 @@
 import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
 import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.AObjectDescBinaryComparatorFactory;
 import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.BooleanBinaryComparatorFactory;
-import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.LongBinaryComparatorFactory;
 import edu.uci.ics.asterix.dataflow.data.nontagged.comparators.RectangleBinaryComparatorFactory;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -15,17 +14,26 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.BytePointable;
 import edu.uci.ics.hyracks.data.std.primitive.DoublePointable;
 import edu.uci.ics.hyracks.data.std.primitive.FloatPointable;
 import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
+import edu.uci.ics.hyracks.data.std.primitive.ShortPointable;
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 
 public class AqlBinaryComparatorFactoryProvider implements IBinaryComparatorFactoryProvider, Serializable {
 
     private static final long serialVersionUID = 1L;
     public static final AqlBinaryComparatorFactoryProvider INSTANCE = new AqlBinaryComparatorFactoryProvider();
+    public static final PointableBinaryComparatorFactory BYTE_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
+            BytePointable.FACTORY);
+    public static final PointableBinaryComparatorFactory SHORT_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
+            ShortPointable.FACTORY);
     public static final PointableBinaryComparatorFactory INTEGER_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
             IntegerPointable.FACTORY);
+    public static final PointableBinaryComparatorFactory LONG_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
+            LongPointable.FACTORY);
     public static final PointableBinaryComparatorFactory FLOAT_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
             FloatPointable.FACTORY);
     public static final PointableBinaryComparatorFactory DOUBLE_POINTABLE_INSTANCE = new PointableBinaryComparatorFactory(
@@ -58,7 +66,11 @@
             return anyBinaryComparatorFactory(ascending);
         }        
         IAType aqlType = (IAType) type;
-        switch (aqlType.getTypeTag()) {
+        return getBinaryComparatorFactory(aqlType.getTypeTag(), ascending);
+    }
+    
+    public IBinaryComparatorFactory getBinaryComparatorFactory(ATypeTag type, boolean ascending) {
+        switch (type) {
             case ANY:
             case UNION: { // we could do smth better for nullable fields
                 return anyBinaryComparatorFactory(ascending);
@@ -83,11 +95,17 @@
             case BOOLEAN: {
                 return addOffset(BooleanBinaryComparatorFactory.INSTANCE, ascending);
             }
+            case INT8: {
+                return addOffset(BYTE_POINTABLE_INSTANCE, ascending);
+            }
+            case INT16: {
+                return addOffset(SHORT_POINTABLE_INSTANCE, ascending);
+            }
             case INT32: {
                 return addOffset(INTEGER_POINTABLE_INSTANCE, ascending);
             }
             case INT64: {
-                return addOffset(LongBinaryComparatorFactory.INSTANCE, ascending);
+                return addOffset(LONG_POINTABLE_INSTANCE, ascending);
             }
             case FLOAT: {
                 return addOffset(FLOAT_POINTABLE_INSTANCE, ascending);
@@ -101,12 +119,14 @@
             case RECTANGLE: {
                 return addOffset(RectangleBinaryComparatorFactory.INSTANCE, ascending);
             }
+            case DATE:
+            case TIME:
             case DATETIME: {
-            	return addOffset(ADateTimeAscBinaryComparatorFactory.INSTANCE, ascending);
+                return addOffset(ADateTimeAscBinaryComparatorFactory.INSTANCE, ascending);
             }
             default: {
                 throw new NotImplementedException("No binary comparator factory implemented for type "
-                        + aqlType.getTypeTag() + " .");
+                        + type + " .");
             }
         }
     }
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index 552a093..a624cf9 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -9,7 +9,6 @@
 import org.apache.commons.lang3.mutable.Mutable;
 
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.common.AqlNullableTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.base.IResultTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.ABooleanTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.ACircleTypeComputer;
@@ -235,8 +234,11 @@
     public final static FunctionIdentifier AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-avg", 1);
     public final static FunctionIdentifier COUNT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-count", 1);
     public final static FunctionIdentifier SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-sum", 1);
+    public final static FunctionIdentifier LOCAL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sum", 1);
     public final static FunctionIdentifier MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-max", 1);
+    public final static FunctionIdentifier LOCAL_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-max", 1);
     public final static FunctionIdentifier MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-min", 1);
+    public final static FunctionIdentifier LOCAL_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-min", 1);
     public final static FunctionIdentifier GLOBAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "agg-global-avg", 1);
     public final static FunctionIdentifier LOCAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -260,6 +262,8 @@
             "count-serial", 1);
     public final static FunctionIdentifier SERIAL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "sum-serial", 1);
+    public final static FunctionIdentifier SERIAL_LOCAL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "local-sum-serial", 1);
     public final static FunctionIdentifier SERIAL_GLOBAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "global-avg-serial", 1);
     public final static FunctionIdentifier SERIAL_LOCAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -511,8 +515,10 @@
         addPrivateFunction(LOCAL_AVG, NonTaggedLocalAvgTypeComputer.INSTANCE);
         add(MAKE_FIELD_INDEX_HANDLE, null); // TODO
         add(MAKE_FIELD_NAME_HANDLE, null); // TODO
-        addPrivateFunction(MAX, NonTaggedSumTypeComputer.INSTANCE);
-        addPrivateFunction(MIN, NonTaggedSumTypeComputer.INSTANCE);
+        add(MAX, NonTaggedSumTypeComputer.INSTANCE);
+        add(LOCAL_MAX, NonTaggedSumTypeComputer.INSTANCE);
+        add(MIN, NonTaggedSumTypeComputer.INSTANCE);
+        add(LOCAL_MIN, NonTaggedSumTypeComputer.INSTANCE);
         add(NON_EMPTY_STREAM, ABooleanTypeComputer.INSTANCE);
         add(NULL_CONSTRUCTOR, ANullTypeComputer.INSTANCE);
         add(NUMERIC_UNARY_MINUS, NonTaggedUnaryMinusTypeComputer.INSTANCE);
@@ -568,6 +574,7 @@
         add(SERIAL_GLOBAL_AVG, OptionalADoubleTypeComputer.INSTANCE);
         add(SERIAL_LOCAL_AVG, NonTaggedLocalAvgTypeComputer.INSTANCE);
         add(SERIAL_SUM, NonTaggedSumTypeComputer.INSTANCE);
+        add(SERIAL_LOCAL_SUM, NonTaggedSumTypeComputer.INSTANCE);
         add(SIMILARITY_JACCARD, AFloatTypeComputer.INSTANCE);
         add(SIMILARITY_JACCARD_CHECK, OrderedListOfAnyTypeComputer.INSTANCE);
         add(SIMILARITY_JACCARD_SORTED, AFloatTypeComputer.INSTANCE);
@@ -619,6 +626,7 @@
         });
         add(SUBSTRING, SubstringTypeComputer.INSTANCE);
         addPrivateFunction(SUM, NonTaggedSumTypeComputer.INSTANCE);
+        add(LOCAL_SUM, NonTaggedSumTypeComputer.INSTANCE);
         add(SWITCH_CASE, NonTaggedSwitchCaseComputer.INSTANCE);
         add(REG_EXP, ABooleanTypeComputer.INSTANCE);
         add(INJECT_FAILURE, InjectFailureTypeComputer.INSTANCE);
@@ -668,15 +676,17 @@
         addGlobalAgg(COUNT, SUM);
 
         addAgg(MAX);
-        addLocalAgg(MAX, MAX);
+        addAgg(LOCAL_MAX);
+        addLocalAgg(MAX, LOCAL_MAX);
         addGlobalAgg(MAX, MAX);
 
         addAgg(MIN);
-        addLocalAgg(MIN, MIN);
+        addLocalAgg(MIN, LOCAL_MIN);
         addGlobalAgg(MIN, MIN);
 
         addAgg(SUM);
-        addLocalAgg(SUM, SUM);
+        addAgg(LOCAL_SUM);
+        addLocalAgg(SUM, LOCAL_SUM);
         addGlobalAgg(SUM, SUM);
 
         addAgg(LISTIFY);
@@ -685,6 +695,7 @@
         addSerialAgg(AVG, SERIAL_AVG);
         addSerialAgg(COUNT, SERIAL_COUNT);
         addSerialAgg(SUM, SERIAL_SUM);
+        addSerialAgg(LOCAL_SUM, SERIAL_LOCAL_SUM);
         addSerialAgg(LOCAL_AVG, SERIAL_LOCAL_AVG);
         addSerialAgg(GLOBAL_AVG, SERIAL_GLOBAL_AVG);
 
@@ -699,7 +710,8 @@
         addGlobalAgg(SERIAL_AVG, SERIAL_GLOBAL_AVG);
 
         addAgg(SERIAL_SUM);
-        addLocalAgg(SERIAL_SUM, SERIAL_SUM);
+        addAgg(SERIAL_LOCAL_SUM);
+        addLocalAgg(SERIAL_SUM, SERIAL_LOCAL_SUM);
         addGlobalAgg(SERIAL_SUM, SERIAL_SUM);
     }
 
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
index aac5447..b75c074 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
@@ -23,7 +23,7 @@
     FLOAT(11),
     DOUBLE(12),
     STRING(13),
-    NULL(14),
+    NULL(14),    
     BOOLEAN(15),
     DATETIME(16),
     DATE(17),
@@ -42,7 +42,8 @@
     LINE(30),
     POLYGON(31),
     CIRCLE(32),
-    RECTANGLE(33);
+    RECTANGLE(33),
+    SYSTEM_NULL(34);
 
     private byte value;
 
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/base/SingleFieldFrameTupleReference.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/base/SingleFieldFrameTupleReference.java
new file mode 100644
index 0000000..c0c08fd
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/base/SingleFieldFrameTupleReference.java
@@ -0,0 +1,47 @@
+package edu.uci.ics.asterix.runtime.aggregates.base;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SingleFieldFrameTupleReference implements IFrameTupleReference {
+
+    private byte[] fieldData;
+    private int start;
+    private int length;
+    
+    public void reset(byte[] fieldData, int start, int length) {
+        this.fieldData = fieldData;
+        this.start = start;
+        this.length = length;
+    }
+    
+    @Override
+    public int getFieldCount() {
+        return 1;
+    }
+
+    @Override
+    public byte[] getFieldData(int fIdx) {
+        return fieldData;
+    }
+
+    @Override
+    public int getFieldStart(int fIdx) {
+       return start;
+    }
+
+    @Override
+    public int getFieldLength(int fIdx) {
+       return length;
+    }
+
+    @Override
+    public IFrameTupleAccessor getFrameTupleAccessor() {
+        return null;
+    }
+
+    @Override
+    public int getTupleIndex() {
+        return 0;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/AbstractScalarAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/AbstractScalarAggregateDescriptor.java
new file mode 100644
index 0000000..3de05f8
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/AbstractScalarAggregateDescriptor.java
@@ -0,0 +1,46 @@
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.functions.FunctionManagerHolder;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionManager;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import edu.uci.ics.asterix.runtime.unnestingfunctions.std.ScanCollectionDescriptor.ScanCollectionUnnestingFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public abstract class AbstractScalarAggregateDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopyEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException {
+                // The aggregate function will get a SingleFieldFrameTupleReference that points to the result of the ScanCollection.
+                // The list-item will always reside in the first field (column) of the SingleFieldFrameTupleReference.
+                ICopyEvaluatorFactory[] aggFuncArgs = new ICopyEvaluatorFactory[1];
+                aggFuncArgs[0] = new ColumnAccessEvalFactory(0);
+                // Create aggregate function from this scalar version.
+                FunctionIdentifier fid = AsterixBuiltinFunctions.getAggregateFunction(getIdentifier());
+                IFunctionManager mgr = FunctionManagerHolder.getFunctionManager();
+                IFunctionDescriptor fd = mgr.lookupFunction(fid);
+                AbstractAggregateFunctionDynamicDescriptor aggFuncDesc = (AbstractAggregateFunctionDynamicDescriptor) fd;
+                ICopyAggregateFunctionFactory aggFuncFactory = aggFuncDesc.createAggregateFunctionFactory(aggFuncArgs);
+                // Use ScanCollection to iterate over list items.
+                ScanCollectionUnnestingFunctionFactory scanCollectionFactory = new ScanCollectionUnnestingFunctionFactory(
+                        args[0]);
+                return new GenericScalarAggregateFunction(aggFuncFactory.createAggregateFunction(output),
+                        scanCollectionFactory);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/GenericScalarAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/GenericScalarAggregateFunction.java
new file mode 100644
index 0000000..195b391
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/GenericScalarAggregateFunction.java
@@ -0,0 +1,42 @@
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.runtime.aggregates.base.SingleFieldFrameTupleReference;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyUnnestingFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * Implements scalar aggregates by iterating over a collection with the ScanCollection unnesting function,
+ * and applying the corresponding ICopyAggregateFunction to each collection-item.
+ */
+public class GenericScalarAggregateFunction implements ICopyEvaluator {
+
+    private final ArrayBackedValueStorage listItemOut = new ArrayBackedValueStorage();
+    private final ICopyAggregateFunction aggFunc;
+    private final ICopyUnnestingFunction scanCollection;
+
+    private final SingleFieldFrameTupleReference itemTuple = new SingleFieldFrameTupleReference();
+
+    public GenericScalarAggregateFunction(ICopyAggregateFunction aggFunc,
+            ICopyUnnestingFunctionFactory scanCollectionFactory) throws AlgebricksException {
+        this.aggFunc = aggFunc;
+        this.scanCollection = scanCollectionFactory.createUnnestingFunction(listItemOut);
+        listItemOut.reset();
+    }
+
+    @Override
+    public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+        scanCollection.init(tuple);
+        aggFunc.init();
+        while (scanCollection.step()) {
+            itemTuple.reset(listItemOut.getByteArray(), 0, listItemOut.getLength());
+            aggFunc.step(itemTuple);
+            listItemOut.reset();
+        }
+        aggFunc.finish();
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarAvgAggregateDescriptor.java
new file mode 100644
index 0000000..ae2a485
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarAvgAggregateDescriptor.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarAvgAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "avg", 1);
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarAvgAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarCountAggregateDescriptor.java
new file mode 100644
index 0000000..6f3baa6
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarCountAggregateDescriptor.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarCountAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "count", 1);
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarCountAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarMaxAggregateDescriptor.java
new file mode 100644
index 0000000..a71eb3c
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarMaxAggregateDescriptor.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarMaxAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "max", 1);
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarMaxAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarMinAggregateDescriptor.java
new file mode 100644
index 0000000..4beae60
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarMinAggregateDescriptor.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarMinAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "min", 1);
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarMinAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSumAggregateDescriptor.java
new file mode 100644
index 0000000..f3d9d1a
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ScalarSumAggregateDescriptor.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.asterix.runtime.aggregates.scalar;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ScalarSumAggregateDescriptor extends AbstractScalarAggregateDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "sum", 1);
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new ScalarSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
index d8f2553..7f42938 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableCountAggregateDescriptor.java
@@ -7,26 +7,30 @@
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.om.base.AInt32;
 import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 /**
- * NULLs are also counted.
+ * count(NULL) returns NULL.
  */
 public class SerializableCountAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
-    public final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "count-serial",
-            1);
+    public final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "count-serial", 1);
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableCountAggregateDescriptor();
@@ -39,8 +43,8 @@
     }
 
     @Override
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
-            throws AlgebricksException {
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
         return new ICopySerializableAggregateFunctionFactory() {
             private static final long serialVersionUID = 1L;
 
@@ -52,10 +56,16 @@
                     @SuppressWarnings("unchecked")
                     private ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
                             .getSerializerDeserializer(BuiltinType.AINT32);
+                    @SuppressWarnings("unchecked")
+                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+                            .getSerializerDeserializer(BuiltinType.ANULL);
+                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
 
                     @Override
                     public void init(DataOutput state) throws AlgebricksException {
                         try {
+                            state.writeBoolean(false);
                             state.writeInt(0);
                         } catch (IOException e) {
                             throw new AlgebricksException(e);
@@ -65,17 +75,32 @@
                     @Override
                     public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
                             throws AlgebricksException {
-                        int cnt = BufferSerDeUtil.getInt(state, start);
-                        cnt++;
-                        BufferSerDeUtil.writeInt(cnt, state, start);
+                        boolean metNull = BufferSerDeUtil.getBoolean(state, start);
+                        int cnt = BufferSerDeUtil.getInt(state, start + 1);
+                        inputVal.reset();
+                        eval.evaluate(tuple);
+                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                                .deserialize(inputVal.getByteArray()[0]);
+                        if (typeTag == ATypeTag.NULL) {
+                            metNull = true;
+                        } else {
+                            cnt++;
+                        }
+                        BufferSerDeUtil.writeBoolean(metNull, state, start);
+                        BufferSerDeUtil.writeInt(cnt, state, start + 1);
                     }
 
                     @Override
                     public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
-                        int cnt = BufferSerDeUtil.getInt(state, start);
+                        boolean metNull = BufferSerDeUtil.getBoolean(state, start);
+                        int cnt = BufferSerDeUtil.getInt(state, start + 1);
                         try {
-                            result.setValue(cnt);
-                            int32Serde.serialize(result, out);
+                            if (metNull) {
+                                nullSerde.serialize(ANull.NULL, out);
+                            } else {
+                                result.setValue(cnt);
+                                int32Serde.serialize(result, out);
+                            }
                         } catch (IOException e) {
                             throw new AlgebricksException(e);
                         }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
index 02d24a0..9c8427a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableGlobalAvgAggregateDescriptor.java
@@ -6,7 +6,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
@@ -44,8 +43,6 @@
     private static final long serialVersionUID = 1L;
     public final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "global-avg-serial", 1);
-    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
-    private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         public IFunctionDescriptor createFunctionDescriptor() {
             return new SerializableGlobalAvgAggregateDescriptor();
@@ -119,12 +116,25 @@
                         inputVal.reset();
                         eval.evaluate(tuple);
                         byte[] serBytes = inputVal.getByteArray();
-                        if (serBytes[0] == SER_NULL_TYPE_TAG)
-                            metNull = true;
-                        if (serBytes[0] != SER_RECORD_TYPE_TAG) {
-                            throw new AlgebricksException("Global-Avg is not defined for values of type "
-                                    + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
-                        }
+                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);                        
+                        switch (typeTag) {
+                            case NULL: {
+                                metNull = true;
+                                break;
+                            }
+                            case SYSTEM_NULL: {
+                                // Ignore and return.
+                                return;
+                            }
+                            case RECORD: {
+                                // Expected.
+                                break;
+                            }
+                            default: {
+                                throw new AlgebricksException("Global-Avg is not defined for values of type "
+                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+                            }
+                        }                        
                         int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, 0, 1, true);
                         if (offset1 == 0) // the sum is null
                             metNull = true;
@@ -146,19 +156,15 @@
                         long globalCount = BufferSerDeUtil.getLong(state, start + 8);
                         boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
 
-                        if (globalCount == 0) {
-                            GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
-                        } else {
-                            try {
-                                if (metNull)
-                                    nullSerde.serialize(ANull.NULL, result);
-                                else {
-                                    aDouble.setValue(globalSum / globalCount);
-                                    doubleSerde.serialize(aDouble, result);
-                                }
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
+                        try {
+                            if (globalCount == 0 || metNull)
+                                nullSerde.serialize(ANull.NULL, result);
+                            else {
+                                aDouble.setValue(globalSum / globalCount);
+                                doubleSerde.serialize(aDouble, result);
                             }
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
                         }
                     }
 
@@ -173,25 +179,21 @@
                             recordEval = new ClosedRecordConstructorEval(recType,
                                     new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, result);
 
-                        if (globalCount == 0) {
-                            GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
-                        } else {
-                            try {
-                                if (metNull) {
-                                    sumBytes.reset();
-                                    nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                                } else {
-                                    sumBytes.reset();
-                                    aDouble.setValue(globalSum);
-                                    doubleSerde.serialize(aDouble, sumBytesOutput);
-                                }
-                                countBytes.reset();
-                                aInt64.setValue(globalCount);
-                                longSerde.serialize(aInt64, countBytesOutput);
-                                recordEval.evaluate(null);
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
+                        try {
+                            if (globalCount == 0 || metNull) {
+                                sumBytes.reset();
+                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
+                            } else {
+                                sumBytes.reset();
+                                aDouble.setValue(globalSum);
+                                doubleSerde.serialize(aDouble, sumBytesOutput);
                             }
+                            countBytes.reset();
+                            aInt64.setValue(globalCount);
+                            longSerde.serialize(aInt64, countBytesOutput);
+                            recordEval.evaluate(null);
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
                         }
                     }
                 };
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
index d96e488..3dea2e1 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
@@ -6,7 +6,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
@@ -60,8 +59,8 @@
     }
 
     @Override
-    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(ICopyEvaluatorFactory[] args)
-            throws AlgebricksException {
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            ICopyEvaluatorFactory[] args) throws AlgebricksException {
         final ICopyEvaluatorFactory[] evals = args;
         List<IAType> unionList = new ArrayList<IAType>();
         unionList.add(BuiltinType.ANULL);
@@ -104,7 +103,7 @@
                         try {
                             state.writeDouble(0.0);
                             state.writeLong(0);
-                            state.writeBoolean(false);
+                            state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
                         } catch (IOException e) {
                             throw new AlgebricksException(e);
                         }
@@ -117,87 +116,90 @@
                         eval.evaluate(tuple);
                         double sum = BufferSerDeUtil.getDouble(state, start);
                         long count = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-                        if (inputVal.getLength() > 0) {
-                            ++count;
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                                    .deserialize(inputVal.getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT16: {
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT32: {
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT64: {
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute AVG for values of type "
-                                            + typeTag);
-                                }
+                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                                .deserialize(inputVal.getByteArray()[0]);
+                        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
+                        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+                            aggType = ATypeTag.NULL;
+                            return;
+                        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+                            aggType = typeTag;
+                        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
+                            throw new AlgebricksException("Unexpected type " + typeTag
+                                    + " in aggregation input stream. Expected type " + aggType + ".");
+                        }
+                        ++count;
+                        switch (typeTag) {
+                            case INT8: {
+                                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
                             }
-                            inputVal.reset();
+                            case INT16: {
+                                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case INT32: {
+                                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case INT64: {
+                                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case FLOAT: {
+                                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case DOUBLE: {
+                                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case NULL: {
+                                break;
+                            }
+                            default: {
+                                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+                            }
                         }
                         BufferSerDeUtil.writeDouble(sum, state, start);
                         BufferSerDeUtil.writeLong(count, state, start + 8);
-                        BufferSerDeUtil.writeBoolean(metNull, state, start + 16);
+                        state[start + 16] = aggType.serialize();
                     }
 
                     @Override
                     public void finish(byte[] state, int start, int len, DataOutput result) throws AlgebricksException {
                         double sum = BufferSerDeUtil.getDouble(state, start);
                         long count = BufferSerDeUtil.getLong(state, start + 8);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, start + 16);
-                        if (recordEval == null)
-                            recordEval = new ClosedRecordConstructorEval(recType,
-                                    new ICopyEvaluator[] { evalSum, evalCount }, avgBytes, result);
-                        if (count == 0) {
-                            if (GlobalConfig.DEBUG) {
-                                GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+                        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + 16]);
+                        if (recordEval == null) {
+                            recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum,
+                                    evalCount }, avgBytes, result);
+                        }
+                        try {
+                            if (count == 0) {
+                                result.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                                return;
                             }
-                        } else {
-                            try {
-                                if (metNull) {
-                                    sumBytes.reset();
-                                    nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                                } else {
-                                    sumBytes.reset();
-                                    aDouble.setValue(sum);
-                                    doubleSerde.serialize(aDouble, sumBytesOutput);
-                                }
-                                countBytes.reset();
-                                aInt64.setValue(count);
-                                int64Serde.serialize(aInt64, countBytesOutput);
-                                recordEval.evaluate(null);
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
+                            if (aggType == ATypeTag.NULL) {
+                                sumBytes.reset();
+                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
+                            } else {
+                                sumBytes.reset();
+                                aDouble.setValue(sum);
+                                doubleSerde.serialize(aDouble, sumBytesOutput);
                             }
+                            countBytes.reset();
+                            aInt64.setValue(count);
+                            int64Serde.serialize(aInt64, countBytesOutput);
+                            recordEval.evaluate(null);
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
                         }
                     }
 
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
new file mode 100644
index 0000000..6badf0f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+
+public class SerializableLocalSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "local-sum-serial", 1);
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SerializableLocalSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
+            final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+        return new ICopySerializableAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
+                return new SerializableSumAggregateFunction(args, true);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
index 510921a..7100ffb 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
@@ -1,40 +1,14 @@
 package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SerializableSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
@@ -55,182 +29,12 @@
     public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
             throws AlgebricksException {
         return new ICopySerializableAggregateFunctionFactory() {
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = 1L;           
 
             @Override
             public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
-
-                return new ICopySerializableAggregateFunction() {
-
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableFloat aFloat = new AMutableFloat(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-                    private AMutableInt32 aInt32 = new AMutableInt32(0);
-                    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-                    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
-                    @SuppressWarnings("rawtypes")
-                    private ISerializerDeserializer serde;
-
-                    @Override
-                    public void init(DataOutput state) throws AlgebricksException {
-                        try {
-                            state.writeBoolean(false);
-                            state.writeBoolean(false);
-                            state.writeBoolean(false);
-                            state.writeBoolean(false);
-                            state.writeBoolean(false);
-                            state.writeBoolean(false);
-                            state.writeBoolean(false);
-                            state.writeDouble(0.0);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple, byte[] state, int start, int len)
-                            throws AlgebricksException {
-                        int pos = start;
-                        boolean metInt8s = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metInt16s = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metInt32s = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metInt64s = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
-                        double sum = BufferSerDeUtil.getDouble(state, pos);
-
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        if (inputVal.getLength() > 0) {
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
-                                    .getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    metInt8s = true;
-                                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT16: {
-                                    metInt16s = true;
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT32: {
-                                    metInt32s = true;
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT64: {
-                                    metInt64s = true;
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    metFloats = true;
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    metDoubles = true;
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute SUM for values of type "
-                                            + typeTag);
-                                }
-                            }
-                        }
-
-                        pos = start;
-                        BufferSerDeUtil.writeBoolean(metInt8s, state, pos++);
-                        BufferSerDeUtil.writeBoolean(metInt16s, state, pos++);
-                        BufferSerDeUtil.writeBoolean(metInt32s, state, pos++);
-                        BufferSerDeUtil.writeBoolean(metInt64s, state, pos++);
-                        BufferSerDeUtil.writeBoolean(metFloats, state, pos++);
-                        BufferSerDeUtil.writeBoolean(metDoubles, state, pos++);
-                        BufferSerDeUtil.writeBoolean(metNull, state, pos++);
-                        BufferSerDeUtil.writeDouble(sum, state, pos);
-                    }
-
-                    @SuppressWarnings("unchecked")
-                    @Override
-                    public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
-                        int pos = start;
-                        boolean metInt8s = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metInt16s = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metInt32s = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metInt64s = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
-                        boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
-                        double sum = BufferSerDeUtil.getDouble(state, pos);
-                        try {
-                            if (metNull) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.ANULL);
-                                serde.serialize(ANull.NULL, out);
-                            } else if (metDoubles) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                                aDouble.setValue(sum);
-                                serde.serialize(aDouble, out);
-                            } else if (metFloats) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AFLOAT);
-                                aFloat.setValue((float) sum);
-                                serde.serialize(aFloat, out);
-                            } else if (metInt64s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT64);
-                                aInt64.setValue((long) sum);
-                                serde.serialize(aInt64, out);
-                            } else if (metInt32s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT32);
-                                aInt32.setValue((int) sum);
-                                serde.serialize(aInt32, out);
-                            } else if (metInt16s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT16);
-                                aInt16.setValue((short) sum);
-                                serde.serialize(aInt16, out);
-                            } else if (metInt8s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT8);
-                                aInt8.setValue((byte) sum);
-                                serde.serialize(aInt8, out);
-                            } else {
-                                GlobalConfig.ASTERIX_LOGGER.fine("SUM aggregate ran over empty input.");
-                            }
-
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-
-                    }
-
-                    @Override
-                    public void finishPartial(byte[] state, int start, int len, DataOutput out)
-                            throws AlgebricksException {
-                        finish(state, start, len, out);
-                    }
-                };
+                return new SerializableSumAggregateFunction(args, false);
             }
         };
     }
-
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
new file mode 100644
index 0000000..c027716
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -0,0 +1,198 @@
+package edu.uci.ics.asterix.runtime.aggregates.serializable.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SerializableSumAggregateFunction implements ICopySerializableAggregateFunction {
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableFloat aFloat = new AMutableFloat(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+    private AMutableInt32 aInt32 = new AMutableInt32(0);
+    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+    @SuppressWarnings("rawtypes")
+    private ISerializerDeserializer serde;
+    private final boolean isLocalAgg;
+
+    public SerializableSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
+            throws AlgebricksException {
+        eval = args[0].createEvaluator(inputVal);
+        this.isLocalAgg = isLocalAgg;
+    }
+
+    @Override
+    public void init(DataOutput state) throws AlgebricksException {
+        try {
+            state.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+            state.writeDouble(0.0);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws AlgebricksException {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
+        double sum = BufferSerDeUtil.getDouble(state, start + 1);
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+            aggType = ATypeTag.NULL;
+            return;
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            aggType = typeTag;
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
+            throw new AlgebricksException("Unexpected type " + typeTag
+                    + " in aggregation input stream. Expected type " + aggType + ".");
+        }
+        switch (typeTag) {
+            case INT8: {
+                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT16: {
+                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT32: {
+                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT64: {
+                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case FLOAT: {
+                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case DOUBLE: {
+                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case NULL: {
+                aggType = typeTag;
+                break;
+            }
+            case SYSTEM_NULL: {
+                // For global aggregates simply ignore system null here,
+                // but if all input value are system null, then we should return
+                // null in finish().
+                if (isLocalAgg) {
+                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+                }
+                break;
+            }
+            default: {
+                throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
+            }
+        }
+        state[start] = aggType.serialize();
+        BufferSerDeUtil.writeDouble(sum, state, start + 1);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start]);
+        double sum = BufferSerDeUtil.getDouble(state, start + 1);
+        try {
+            switch (aggType) {
+                case INT8: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
+                    aInt8.setValue((byte) sum);
+                    serde.serialize(aInt8, out);
+                    break;
+                }
+                case INT16: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
+                    aInt16.setValue((short) sum);
+                    serde.serialize(aInt16, out);
+                    break;
+                }
+                case INT32: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+                    aInt32.setValue((int) sum);
+                    serde.serialize(aInt32, out);
+                    break;
+                }
+                case INT64: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+                    aInt64.setValue((long) sum);
+                    serde.serialize(aInt64, out);
+                    break;
+                }
+                case FLOAT: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
+                    aFloat.setValue((float) sum);
+                    serde.serialize(aFloat, out);
+                    break;
+                }
+                case DOUBLE: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+                    aDouble.setValue(sum);
+                    serde.serialize(aDouble, out);
+                    break;
+                }
+                case NULL: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                    serde.serialize(ANull.NULL, out);
+                    break;
+                }
+                case SYSTEM_NULL: {
+                    // Empty stream. For local agg return system null. For global agg return null.
+                    if (isLocalAgg) {
+                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                    } else {
+                        serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                        serde.serialize(ANull.NULL, out);
+                    }
+                    break;
+                }
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+
+    }
+
+    @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput out) throws AlgebricksException {
+        finish(state, start, len, out);
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
index c82f1f0..08bb063 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
@@ -79,9 +79,10 @@
 
                     private DataOutput out = provider.getDataOutput();
                     private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
+                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);                    
                     private double sum;
                     private int count;
+                    private ATypeTag aggType;
                     private AMutableDouble aDouble = new AMutableDouble(0);
                     private AMutableInt32 aInt32 = new AMutableInt32(0);
 
@@ -104,10 +105,10 @@
                     @SuppressWarnings("unchecked")
                     private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
                             .getSerializerDeserializer(BuiltinType.ANULL);
-                    private boolean metNull;
 
                     @Override
                     public void init() throws AlgebricksException {
+                        aggType = ATypeTag.SYSTEM_NULL;
                         sum = 0.0;
                         count = 0;
                     }
@@ -115,70 +116,72 @@
                     @Override
                     public void step(IFrameTupleReference tuple) throws AlgebricksException {
                         inputVal.reset();
-                        eval.evaluate(tuple);
-                        if (inputVal.getLength() > 0) {
+                        eval.evaluate(tuple);                        
+                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                                .deserialize(inputVal.getByteArray()[0]);
+                        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+                            aggType = ATypeTag.NULL;
+                            return;
+                        } else if (aggType == ATypeTag.SYSTEM_NULL) {                           
+                            aggType = typeTag;
+                        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
+                            throw new AlgebricksException("Unexpected type " + typeTag
+                                    + " in aggregation input stream. Expected type " + aggType + ".");
+                        }
+                        if (typeTag != ATypeTag.SYSTEM_NULL) {
                             ++count;
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
-                                    .getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT16: {
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT32: {
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT64: {
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute AVG for values of type "
-                                            + typeTag);
-                                }
+                        }                        
+                        switch (typeTag) {
+                            case INT8: {
+                                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
                             }
-                            inputVal.reset();
+                            case INT16: {
+                                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case INT32: {
+                                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case INT64: {
+                                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case FLOAT: {
+                                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case DOUBLE: {
+                                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case NULL: {
+                                break;
+                            }
+                            default: {
+                                throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
+                            }
                         }
                     }
 
                     @Override
                     public void finish() throws AlgebricksException {
-                        if (count == 0) {
-                            GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
-                        } else {
-                            try {
-                                if (metNull)
-                                    nullSerde.serialize(ANull.NULL, out);
-                                else {
-                                    aDouble.setValue(sum / count);
-                                    doubleSerde.serialize(aDouble, out);
-                                }
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
+                        try {
+                            if (count == 0 || aggType == ATypeTag.NULL) {
+                                nullSerde.serialize(ANull.NULL, out);
+                            } else {
+                                aDouble.setValue(sum / count);
+                                doubleSerde.serialize(aDouble, out);
                             }
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
                         }
                     }
 
@@ -190,7 +193,7 @@
                             }
                         } else {
                             try {
-                                if (metNull) {
+                                if (aggType == ATypeTag.NULL) {
                                     sumBytes.reset();
                                     nullSerde.serialize(ANull.NULL, sumBytesOutput);
                                 } else {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateDescriptor.java
index 615f543..2b214d6 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateDescriptor.java
@@ -1,24 +1,15 @@
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 /**
  * NULLs are also counted.
@@ -47,43 +38,8 @@
 
             @Override
             public ICopyAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws AlgebricksException {
-
-                final DataOutput out = provider.getDataOutput();
-
-                return new ICopyAggregateFunction() {
-                    private AMutableInt32 result = new AMutableInt32(-1);
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AINT32);
-                    private int cnt;
-
-                    @Override
-                    public void init() {
-                        cnt = 0;
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-                        cnt++;
-                    }
-
-                    @Override
-                    public void finish() throws AlgebricksException {
-                        try {
-                            result.setValue(cnt);
-                            int32Serde.serialize(result, out);
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-                    }
-
-                    @Override
-                    public void finishPartial() throws AlgebricksException {
-                        finish();
-                    }
-                };
+                return new CountAggregateFunction(args, provider);
             }
         };
     }
-
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
new file mode 100644
index 0000000..e4d015b
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/CountAggregateFunction.java
@@ -0,0 +1,81 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * count(NULL) returns NULL.
+ */
+public class CountAggregateFunction implements ICopyAggregateFunction {
+    private AMutableInt32 result = new AMutableInt32(-1);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT32);
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ANULL);
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    private boolean metNull;
+    private int cnt;
+    private DataOutput out;
+
+    public CountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output) throws AlgebricksException {
+        eval = args[0].createEvaluator(inputVal);
+        out = output.getDataOutput();
+    }
+
+    @Override
+    public void init() {
+        cnt = 0;
+        metNull = false;
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        // Ignore SYSTEM_NULL.
+        if (typeTag == ATypeTag.NULL) {
+            metNull = true;
+        } else {
+            cnt++;
+        }
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            if (metNull) {
+                nullSerde.serialize(ANull.NULL, out);
+            } else {
+                result.setValue(cnt);
+                int32Serde.serialize(result, out);
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
index d94647b..7d49228 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/GlobalAvgAggregateDescriptor.java
@@ -6,7 +6,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
@@ -45,8 +44,6 @@
     private static final long serialVersionUID = 1L;
     public final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-global-avg",
             1);
-    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
-    private final static byte SER_RECORD_TYPE_TAG = ATypeTag.RECORD.serialize();
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         public IFunctionDescriptor createFunctionDescriptor() {
             return new GlobalAvgAggregateDescriptor();
@@ -117,11 +114,24 @@
                         inputVal.reset();
                         eval.evaluate(tuple);
                         byte[] serBytes = inputVal.getByteArray();
-                        if (serBytes[0] == SER_NULL_TYPE_TAG)
-                            metNull = true;
-                        if (serBytes[0] != SER_RECORD_TYPE_TAG) {
-                            throw new AlgebricksException("Global-Avg is not defined for values of type "
-                                    + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);                        
+                        switch (typeTag) {
+                            case NULL: {
+                                metNull = true;
+                                break;
+                            }
+                            case SYSTEM_NULL: {
+                                // Ignore and return.
+                                return;
+                            }
+                            case RECORD: {
+                                // Expected.
+                                break;
+                            }
+                            default: {
+                                throw new AlgebricksException("Global-Avg is not defined for values of type "
+                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]));
+                            }
                         }
                                                
                         // The record length helps us determine whether the input record fields are nullable.
@@ -143,43 +153,35 @@
 
                     @Override
                     public void finish() throws AlgebricksException {
-                        if (globalCount == 0) {
-                            GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
-                        } else {
-                            try {
-                                if (metNull)
-                                    nullSerde.serialize(ANull.NULL, out);
-                                else {
-                                    aDouble.setValue(globalSum / globalCount);
-                                    doubleSerde.serialize(aDouble, out);
-                                }
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
+                        try {
+                            if (globalCount == 0 || metNull)
+                                nullSerde.serialize(ANull.NULL, out);
+                            else {
+                                aDouble.setValue(globalSum / globalCount);
+                                doubleSerde.serialize(aDouble, out);
                             }
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
                         }
                     }
 
                     @Override
                     public void finishPartial() throws AlgebricksException {
-                        if (globalCount == 0) {
-                            GlobalConfig.ASTERIX_LOGGER.fine("AVG aggregate ran over empty input.");
-                        } else {
-                            try {
-                                if (metNull) {
-                                    sumBytes.reset();
-                                    nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                                } else {
-                                    sumBytes.reset();
-                                    aDouble.setValue(globalSum);
-                                    doubleSerde.serialize(aDouble, sumBytesOutput);
-                                }
-                                countBytes.reset();
-                                aInt32.setValue(globalCount);
-                                intSerde.serialize(aInt32, countBytesOutput);
-                                recordEval.evaluate(null);
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
+                        try {
+                            if (metNull || globalCount == 0) {
+                                sumBytes.reset();
+                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
+                            } else {
+                                sumBytes.reset();
+                                aDouble.setValue(globalSum);
+                                doubleSerde.serialize(aDouble, sumBytesOutput);
                             }
+                            countBytes.reset();
+                            aInt32.setValue(globalCount);
+                            intSerde.serialize(aInt32, countBytesOutput);
+                            recordEval.evaluate(null);
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
                         }
                     }
                 };
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
index d9173bd..02d8c91 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalAvgAggregateDescriptor.java
@@ -6,7 +6,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
@@ -82,6 +81,7 @@
                     private DataOutput out = provider.getDataOutput();
                     private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
                     private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
+                    private ATypeTag aggType;
                     private double sum;
                     private int count;
 
@@ -105,90 +105,95 @@
                             .getSerializerDeserializer(BuiltinType.ANULL);
                     private AMutableDouble aDouble = new AMutableDouble(0);
                     private AMutableInt32 aInt32 = new AMutableInt32(0);
-                    private boolean metNull;
 
                     @Override
                     public void init() {
+                        aggType = ATypeTag.SYSTEM_NULL;
                         sum = 0.0;
                         count = 0;
-                        metNull = false;
                     }
 
                     @Override
                     public void step(IFrameTupleReference tuple) throws AlgebricksException {
                         inputVal.reset();
                         eval.evaluate(tuple);
-                        if (inputVal.getLength() > 0) {
-                            ++count;
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
-                                    .getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT16: {
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT32: {
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT64: {
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute LOCAL-AVG for values of type "
-                                            + typeTag);
-                                }
-                            }
-                            inputVal.reset();
+                        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                                .deserialize(inputVal.getByteArray()[0]);
+                        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+                            aggType = ATypeTag.NULL;
+                            return;
+                        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+                            aggType = typeTag;
+                        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
+                            throw new AlgebricksException("Unexpected type " + typeTag
+                                    + " in aggregation input stream. Expected type " + aggType + ".");
                         }
+                        if (typeTag != ATypeTag.SYSTEM_NULL) {
+                            ++count;
+                        }
+                        switch (typeTag) {
+                            case INT8: {
+                                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case INT16: {
+                                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case INT32: {
+                                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case INT64: {
+                                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case FLOAT: {
+                                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case DOUBLE: {
+                                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                                sum += val;
+                                break;
+                            }
+                            case NULL: {
+                                break;
+                            }
+                            default: {
+                                throw new NotImplementedException("Cannot compute LOCAL-AVG for values of type "
+                                        + typeTag);
+                            }
+                        }
+                        inputVal.reset();
                     }
 
                     @Override
                     public void finish() throws AlgebricksException {
-                        if (count == 0) {
-                            if (GlobalConfig.DEBUG) {
-                                GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
+                        try {
+                            if (count == 0) {
+                                out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                                return;
                             }
-                        } else {
-                            try {
-                                if (metNull) {
-                                    sumBytes.reset();
-                                    nullSerde.serialize(ANull.NULL, sumBytesOutput);
-                                } else {
-                                    sumBytes.reset();
-                                    aDouble.setValue(sum);
-                                    doubleSerde.serialize(aDouble, sumBytesOutput);
-                                }
-                                countBytes.reset();
-                                aInt32.setValue(count);
-                                int32Serde.serialize(aInt32, countBytesOutput);
-                                recordEval.evaluate(null);
-                            } catch (IOException e) {
-                                throw new AlgebricksException(e);
+                            if (aggType == ATypeTag.NULL) {
+                                sumBytes.reset();
+                                nullSerde.serialize(ANull.NULL, sumBytesOutput);
+                            } else {
+                                sumBytes.reset();
+                                aDouble.setValue(sum);
+                                doubleSerde.serialize(aDouble, sumBytesOutput);
                             }
+                            countBytes.reset();
+                            aInt32.setValue(count);
+                            int32Serde.serialize(aInt32, countBytesOutput);
+                            recordEval.evaluate(null);
+                        } catch (IOException e) {
+                            throw new AlgebricksException(e);
                         }
                     }
 
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java
new file mode 100644
index 0000000..59287d5
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalMaxAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-max",
+            1);
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalMaxAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new MinMaxAggregateFunction(args, provider, false, true);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
new file mode 100644
index 0000000..ca32e2f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalMinAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-min",
+            1);
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalMinAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new MinMaxAggregateFunction(args, provider, true, true);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
new file mode 100644
index 0000000..c133e09
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalSumAggregateDescriptor.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+
+public class LocalSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sum",
+            1);
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalSumAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new SumAggregateFunction(args, provider, true);
+            };
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
index 3bc40b6..18bfe9f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
@@ -1,39 +1,15 @@
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class MaxAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
 
@@ -50,7 +26,6 @@
         return FID;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
             throws AlgebricksException {
@@ -60,152 +35,7 @@
             @Override
             public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
                     throws AlgebricksException {
-
-                return new ICopyAggregateFunction() {
-
-                    private DataOutput out = provider.getDataOutput();
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-                    private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
-
-                    private short shortVal = Short.MIN_VALUE;
-                    private int intVal = Integer.MIN_VALUE;
-                    private long longVal = Long.MIN_VALUE;
-                    private float floatVal = Float.MIN_VALUE;
-                    private double doubleVal = Double.MIN_VALUE;
-
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableFloat aFloat = new AMutableFloat(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-                    private AMutableInt32 aInt32 = new AMutableInt32(0);
-                    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-                    @SuppressWarnings("rawtypes")
-                    private ISerializerDeserializer serde;
-
-                    @Override
-                    public void init() {
-                        shortVal = Short.MIN_VALUE;
-                        intVal = Integer.MIN_VALUE;
-                        longVal = Long.MIN_VALUE;
-                        floatVal = Float.MIN_VALUE;
-                        doubleVal = Double.MIN_VALUE;
-
-                        metInt8s = false;
-                        metInt16s = false;
-                        metInt32s = false;
-                        metInt64s = false;
-                        metFloats = false;
-                        metDoubles = false;
-                        metNull = false;
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        if (inputVal.getLength() > 0) {
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
-                                    .getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    metInt8s = true;
-                                    throw new NotImplementedException("no implementation for int8's comparator");
-                                }
-                                case INT16: {
-                                    metInt16s = true;
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    if (val > shortVal)
-                                        shortVal = val;
-                                    throw new NotImplementedException("no implementation for int16's comparator");
-                                }
-                                case INT32: {
-                                    metInt32s = true;
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    if (val > intVal)
-                                        intVal = val;
-                                    break;
-                                }
-                                case INT64: {
-                                    metInt64s = true;
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    if (val > longVal)
-                                        longVal = val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    metFloats = true;
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    if (val > floatVal)
-                                        floatVal = val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    metDoubles = true;
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    if (val > doubleVal)
-                                        doubleVal = val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute SUM for values of type "
-                                            + typeTag);
-                                }
-                            }
-                        }
-                    }
-
-                    @Override
-                    public void finish() throws AlgebricksException {
-                        try {
-                            if (metNull) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.ANULL);
-                                serde.serialize(ANull.NULL, out);
-                            } else if (metDoubles) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                                aDouble.setValue(doubleVal);
-                                serde.serialize(aDouble, out);
-                            } else if (metFloats) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AFLOAT);
-                                aFloat.setValue(floatVal);
-                                serde.serialize(aFloat, out);
-                            } else if (metInt64s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT64);
-                                aInt64.setValue(longVal);
-                                serde.serialize(aInt64, out);
-                            } else if (metInt32s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT32);
-                                aInt32.setValue(intVal);
-                                serde.serialize(aInt32, out);
-                            } else if (metInt16s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT16);
-                                aInt16.setValue(shortVal);
-                                serde.serialize(aInt16, out);
-                            } else if (metInt8s) {
-                                throw new NotImplementedException("no implementation for int8's comparator");
-                            } else {
-                                GlobalConfig.ASTERIX_LOGGER.fine("SUM aggregate ran over empty input.");
-                            }
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-
-                    }
-
-                    @Override
-                    public void finishPartial() throws AlgebricksException {
-                        finish();
-                    }
-                };
+                return new MinMaxAggregateFunction(args, provider, false, false);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
index 6a88a0d..0bf6ddc 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
@@ -1,39 +1,15 @@
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class MinAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
 
@@ -50,7 +26,6 @@
         return FID;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
             throws AlgebricksException {
@@ -60,154 +35,8 @@
             @Override
             public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
                     throws AlgebricksException {
-
-                return new ICopyAggregateFunction() {
-
-                    private DataOutput out = provider.getDataOutput();
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-                    private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
-
-                    private short shortVal = Short.MAX_VALUE;
-                    private int intVal = Integer.MAX_VALUE;
-                    private long longVal = Long.MAX_VALUE;
-                    private float floatVal = Float.MAX_VALUE;
-                    private double doubleVal = Double.MAX_VALUE;
-
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableFloat aFloat = new AMutableFloat(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-                    private AMutableInt32 aInt32 = new AMutableInt32(0);
-                    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-                    @SuppressWarnings("rawtypes")
-                    private ISerializerDeserializer serde;
-
-                    @Override
-                    public void init() {
-                        shortVal = Short.MAX_VALUE;
-                        intVal = Integer.MAX_VALUE;
-                        longVal = Long.MAX_VALUE;
-                        floatVal = Float.MAX_VALUE;
-                        doubleVal = Double.MAX_VALUE;
-
-                        metInt8s = false;
-                        metInt16s = false;
-                        metInt32s = false;
-                        metInt64s = false;
-                        metFloats = false;
-                        metDoubles = false;
-                        metNull = false;
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        if (inputVal.getLength() > 0) {
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
-                                    .getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    metInt8s = true;
-                                    throw new NotImplementedException("no implementation for int8's comparator");
-                                }
-                                case INT16: {
-                                    metInt16s = true;
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    if (val < shortVal)
-                                        shortVal = val;
-                                    throw new NotImplementedException("no implementation for int16's comparator");
-                                }
-                                case INT32: {
-                                    metInt32s = true;
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    if (val < intVal)
-                                        intVal = val;
-                                    break;
-                                }
-                                case INT64: {
-                                    metInt64s = true;
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    if (val < longVal)
-                                        longVal = val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    metFloats = true;
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    if (val < floatVal)
-                                        floatVal = val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    metDoubles = true;
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    if (val < doubleVal)
-                                        doubleVal = val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute SUM for values of type "
-                                            + typeTag);
-                                }
-                            }
-                        }
-                    }
-
-                    @Override
-                    public void finish() throws AlgebricksException {
-                        try {
-                            if (metNull) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.ANULL);
-                                serde.serialize(ANull.NULL, out);
-                            } else if (metDoubles) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                                aDouble.setValue(doubleVal);
-                                serde.serialize(aDouble, out);
-                            } else if (metFloats) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AFLOAT);
-                                aFloat.setValue(floatVal);
-                                serde.serialize(aFloat, out);
-                            } else if (metInt64s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT64);
-                                aInt64.setValue(longVal);
-                                serde.serialize(aInt64, out);
-                            } else if (metInt32s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT32);
-                                aInt32.setValue(intVal);
-                                serde.serialize(aInt32, out);
-                            } else if (metInt16s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT16);
-                                aInt16.setValue(shortVal);
-                                serde.serialize(aInt16, out);
-                            } else if (metInt8s) {
-                                throw new NotImplementedException("no implementation for int8's comparator");
-                            } else {
-                                GlobalConfig.ASTERIX_LOGGER.fine("SUM aggregate ran over empty input.");
-                            }
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-
-                    }
-
-                    @Override
-                    public void finishPartial() throws AlgebricksException {
-                        finish();
-                    }
-                };
+                return new MinMaxAggregateFunction(args, provider, true, false);
             }
         };
     }
-
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
new file mode 100644
index 0000000..bc3508f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinMaxAggregateFunction.java
@@ -0,0 +1,106 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class MinMaxAggregateFunction implements ICopyAggregateFunction {
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
+    private DataOutput out;
+    private ICopyEvaluator eval;
+    private ATypeTag aggType;
+    private IBinaryComparator cmp;
+    private final boolean isMin;
+    private final boolean isLocalAgg;
+
+    public MinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin,
+            boolean isLocalAgg) throws AlgebricksException {
+        out = provider.getDataOutput();
+        eval = args[0].createEvaluator(inputVal);
+        this.isMin = isMin;
+        this.isLocalAgg = isLocalAgg;
+    }
+
+    @Override
+    public void init() {
+        aggType = ATypeTag.SYSTEM_NULL;
+        outputVal.reset();
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+            aggType = ATypeTag.NULL;
+            return;
+        }
+        if (aggType == ATypeTag.SYSTEM_NULL) {
+            if (typeTag == ATypeTag.SYSTEM_NULL) {
+                // Ignore.
+                return;
+            }
+            // First value encountered. Set type, comparator, and initial value.
+            aggType = typeTag;
+            // Set comparator.
+            IBinaryComparatorFactory cmpFactory = AqlBinaryComparatorFactoryProvider.INSTANCE
+                    .getBinaryComparatorFactory(aggType, isMin);
+            cmp = cmpFactory.createBinaryComparator();
+            // Initialize min value.
+            outputVal.assign(inputVal);
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
+            throw new AlgebricksException("Unexpected type " + typeTag + " in aggregation input stream. Expected type "
+                    + aggType + ".");
+        }
+        if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
+                outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
+            outputVal.assign(inputVal);
+        }
+    }
+
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            switch (aggType) {
+                case NULL: {
+                    out.writeByte(ATypeTag.NULL.serialize());
+                    break;
+                }
+                case SYSTEM_NULL: {
+                    // Empty stream. For local agg return system null. For global agg return null.
+                    if (isLocalAgg) {
+                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                    } else {
+                        out.writeByte(ATypeTag.NULL.serialize());
+                    }
+                    break;
+                }
+                default: {
+                    out.write(outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength());
+                    break;
+                }
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateDescriptor.java
index fa611e2..24e21d9 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateDescriptor.java
@@ -1,41 +1,15 @@
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.AMutableInt8;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class SumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
 
@@ -52,7 +26,6 @@
         return FID;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
             throws AlgebricksException {
@@ -62,145 +35,8 @@
             @Override
             public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
                     throws AlgebricksException {
-
-                return new ICopyAggregateFunction() {
-
-                    private DataOutput out = provider.getDataOutput();
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-                    private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
-                    private double sum;
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableFloat aFloat = new AMutableFloat(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-                    private AMutableInt32 aInt32 = new AMutableInt32(0);
-                    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-                    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
-                    @SuppressWarnings("rawtypes")
-                    private ISerializerDeserializer serde;
-
-                    @Override
-                    public void init() {
-                        metInt8s = false;
-                        metInt16s = false;
-                        metInt32s = false;
-                        metInt64s = false;
-                        metFloats = false;
-                        metDoubles = false;
-                        metNull = false;
-                        sum = 0.0;
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        if (inputVal.getLength() > 0) {
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
-                                    .getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    metInt8s = true;
-                                    byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT16: {
-                                    metInt16s = true;
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT32: {
-                                    metInt32s = true;
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case INT64: {
-                                    metInt64s = true;
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    metFloats = true;
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    metDoubles = true;
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    sum += val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute SUM for values of type "
-                                            + typeTag);
-                                }
-                            }
-                        }
-                    }
-
-                    @Override
-                    public void finish() throws AlgebricksException {
-                        try {
-                            if (metNull) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.ANULL);
-                                serde.serialize(ANull.NULL, out);
-                            } else if (metDoubles) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                                aDouble.setValue(sum);
-                                serde.serialize(aDouble, out);
-                            } else if (metFloats) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AFLOAT);
-                                aFloat.setValue((float) sum);
-                                serde.serialize(aFloat, out);
-                            } else if (metInt64s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT64);
-                                aInt64.setValue((long) sum);
-                                serde.serialize(aInt64, out);
-                            } else if (metInt32s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT32);
-                                aInt32.setValue((int) sum);
-                                serde.serialize(aInt32, out);
-                            } else if (metInt16s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT16);
-                                aInt16.setValue((short) sum);
-                                serde.serialize(aInt16, out);
-                            } else if (metInt8s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT8);
-                                aInt8.setValue((byte) sum);
-                                serde.serialize(aInt8, out);
-                            } else {
-                                GlobalConfig.ASTERIX_LOGGER.fine("SUM aggregate ran over empty input.");
-                            }
-
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-
-                    }
-
-                    @Override
-                    public void finishPartial() throws AlgebricksException {
-                        finish();
-                    }
-                };
-            }
+                return new SumAggregateFunction(args, provider, false);
+            };
         };
     }
-
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
new file mode 100644
index 0000000..d0e82ce
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
@@ -0,0 +1,192 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SumAggregateFunction implements ICopyAggregateFunction {
+    private DataOutput out;
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private ICopyEvaluator eval;
+    private double sum;
+    private ATypeTag aggType;
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableFloat aFloat = new AMutableFloat(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+    private AMutableInt32 aInt32 = new AMutableInt32(0);
+    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+    @SuppressWarnings("rawtypes")
+    private ISerializerDeserializer serde;
+
+    private final boolean isLocalAgg;
+
+    public SumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
+            throws AlgebricksException {
+        out = provider.getDataOutput();
+        eval = args[0].createEvaluator(inputVal);
+        this.isLocalAgg = isLocalAgg;
+    }
+
+    @Override
+    public void init() {
+        aggType = ATypeTag.SYSTEM_NULL;
+        sum = 0.0;
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        inputVal.reset();
+        eval.evaluate(tuple);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+        if (typeTag == ATypeTag.NULL || aggType == ATypeTag.NULL) {
+            aggType = ATypeTag.NULL;
+            return;
+        } else if (aggType == ATypeTag.SYSTEM_NULL) {
+            aggType = typeTag;
+        } else if (typeTag != ATypeTag.SYSTEM_NULL && typeTag != aggType) {
+            throw new AlgebricksException("Unexpected type " + typeTag
+                    + " in aggregation input stream. Expected type " + aggType + ".");
+        }
+        switch (typeTag) {
+            case INT8: {
+                byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT16: {
+                short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT32: {
+                int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case INT64: {
+                long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case FLOAT: {
+                float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case DOUBLE: {
+                double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                sum += val;
+                break;
+            }
+            case NULL: {
+                break;
+            }
+            case SYSTEM_NULL: {
+                // For global aggregates simply ignore system null here,
+                // but if all input value are system null, then we should return
+                // null in finish().
+                if (isLocalAgg) {
+                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+                }
+                break;
+            }
+            default: {
+                throw new NotImplementedException("Cannot compute SUM for values of type " + typeTag + ".");
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            switch (aggType) {
+                case INT8: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
+                    aInt8.setValue((byte) sum);
+                    serde.serialize(aInt8, out);
+                    break;
+                }
+                case INT16: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
+                    aInt16.setValue((short) sum);
+                    serde.serialize(aInt16, out);
+                    break;
+                }
+                case INT32: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+                    aInt32.setValue((int) sum);
+                    serde.serialize(aInt32, out);
+                    break;
+                }
+                case INT64: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+                    aInt64.setValue((long) sum);
+                    serde.serialize(aInt64, out);
+                    break;
+                }
+                case FLOAT: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
+                    aFloat.setValue((float) sum);
+                    serde.serialize(aFloat, out);
+                    break;
+                }
+                case DOUBLE: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+                    aDouble.setValue(sum);
+                    serde.serialize(aDouble, out);
+                    break;
+                }
+                case NULL: {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                    serde.serialize(ANull.NULL, out);
+                    break;
+                }
+                case SYSTEM_NULL: {
+                    // Empty stream. For local agg return system null. For global agg return null.
+                    if (isLocalAgg) {
+                        out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                    } else {
+                        serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                        serde.serialize(ANull.NULL, out);
+                    }
+                    break;
+                }
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/stream/EmptyStreamAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/stream/EmptyStreamAggregateDescriptor.java
new file mode 100644
index 0000000..587a113
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/stream/EmptyStreamAggregateDescriptor.java
@@ -0,0 +1,89 @@
+package edu.uci.ics.asterix.runtime.aggregates.stream;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.ABoolean;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class EmptyStreamAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "empty-stream", 0);
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new EmptyStreamAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+
+                return new ICopyAggregateFunction() {
+
+                    private DataOutput out = provider.getDataOutput();
+                    @SuppressWarnings("rawtypes")
+                    private ISerializerDeserializer serde = AqlSerializerDeserializerProvider.INSTANCE
+                            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+
+                    boolean res = true;
+
+                    @Override
+                    public void init() throws AlgebricksException {
+                        res = true;
+                    }
+
+                    @Override
+                    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+                        res = false;
+                    }
+
+                    @SuppressWarnings("unchecked")
+                    @Override
+                    public void finish() throws AlgebricksException {
+                        ABoolean b = res ? ABoolean.TRUE : ABoolean.FALSE;
+                        try {
+                            serde.serialize(b, out);
+                        } catch (HyracksDataException e) {
+                            throw new AlgebricksException(e);
+                        }
+                    }
+
+                    @Override
+                    public void finishPartial() throws AlgebricksException {
+                        finish();
+                    }
+                };
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/OrderedListConstructorDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/OrderedListConstructorDescriptor.java
index 496c524..af02cdd 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/OrderedListConstructorDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/OrderedListConstructorDescriptor.java
@@ -96,7 +96,7 @@
                     try {
                         for (int i = 0; i < argEvals.length; i++) {
                             inputVal.reset();
-                            argEvals[i].evaluate(tuple);
+                            argEvals[i].evaluate(tuple);                            
                             builder.addItem(inputVal);
                         }
 
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index 86a02af..268d563 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -41,18 +41,28 @@
 import edu.uci.ics.asterix.om.types.AUnorderedListType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.runtime.aggregates.collections.ListifyAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarCountAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarMaxAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.scalar.ScalarSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableCountAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableGlobalAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableLocalSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.serializable.std.SerializableSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.AvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.CountAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.GlobalAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalMaxAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalMinAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.MaxAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.MinAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.SumAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.stream.EmptyStreamAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.stream.NonEmptyStreamAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.common.CreateMBREvalFactory;
 import edu.uci.ics.asterix.runtime.evaluators.common.FieldAccessByIndexEvalFactory;
@@ -80,6 +90,7 @@
 import edu.uci.ics.asterix.runtime.evaluators.functions.AnyCollectionMemberDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.CastRecordDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.ClosedRecordConstructorDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.CodePointToStringDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.ContainsDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.CountHashedGramTokensDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.CountHashedWordTokensDescriptor;
@@ -107,10 +118,16 @@
 import edu.uci.ics.asterix.runtime.evaluators.functions.LenDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.LikeDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.NotDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericAbsDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.NumericAddDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericCeilingDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.NumericDivideDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericFloorDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.NumericModuloDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.NumericMultiplyDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundHalfToEven2Descriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundHalfToEvenDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.NumericSubtractDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.NumericUnaryMinusDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.OpenRecordConstructorDescriptor;
@@ -129,33 +146,26 @@
 import edu.uci.ics.asterix.runtime.evaluators.functions.SpatialDistanceDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.SpatialIntersectDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.StartsWithDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringConcatDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringEndWithDescrtiptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringEqualDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringJoinDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringLengthDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringLowerCaseDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringMatchesDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringMatchesWithFlagDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringReplaceDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringReplaceWithFlagsDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringStartWithDescrtiptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringToCodePointDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.Substring2Descriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringAfterDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringBeforeDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.SwitchCaseDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.UnorderedListConstructorDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.WordTokensDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.YearDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericAbsDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericCeilingDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericFloorDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundHalfToEvenDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundHalfToEven2Descriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringEqualDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringStartWithDescrtiptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringEndWithDescrtiptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringMatchesDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringLowerCaseDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringMatchesWithFlagDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringReplaceDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringReplaceWithFlagsDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringLengthDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.Substring2Descriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringBeforeDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringAfterDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringToCodePointDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.CodePointToStringDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringConcatDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringJoinDescriptor;
 import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
 import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
 import edu.uci.ics.asterix.runtime.runningaggregates.std.TidRunningAggregateDescriptor;
@@ -265,6 +275,7 @@
         temp.add(IsNullDescriptor.FACTORY);
         temp.add(NotDescriptor.FACTORY);
         temp.add(LenDescriptor.FACTORY);
+        temp.add(EmptyStreamAggregateDescriptor.FACTORY);
         temp.add(NonEmptyStreamAggregateDescriptor.FACTORY);
         temp.add(RangeDescriptor.FACTORY);
 
@@ -300,8 +311,11 @@
         temp.add(LocalAvgAggregateDescriptor.FACTORY);
         temp.add(GlobalAvgAggregateDescriptor.FACTORY);
         temp.add(SumAggregateDescriptor.FACTORY);
+        temp.add(LocalSumAggregateDescriptor.FACTORY);
         temp.add(MaxAggregateDescriptor.FACTORY);
+        temp.add(LocalMaxAggregateDescriptor.FACTORY);
         temp.add(MinAggregateDescriptor.FACTORY);
+        temp.add(LocalMinAggregateDescriptor.FACTORY);
 
         // serializable aggregates
         temp.add(SerializableCountAggregateDescriptor.FACTORY);
@@ -309,7 +323,15 @@
         temp.add(SerializableLocalAvgAggregateDescriptor.FACTORY);
         temp.add(SerializableGlobalAvgAggregateDescriptor.FACTORY);
         temp.add(SerializableSumAggregateDescriptor.FACTORY);
+        temp.add(SerializableLocalSumAggregateDescriptor.FACTORY);
 
+        // scalar aggregates
+        temp.add(ScalarCountAggregateDescriptor.FACTORY);
+        temp.add(ScalarAvgAggregateDescriptor.FACTORY);
+        temp.add(ScalarSumAggregateDescriptor.FACTORY);
+        temp.add(ScalarMaxAggregateDescriptor.FACTORY);
+        temp.add(ScalarMinAggregateDescriptor.FACTORY);
+        
         // new functions - constructors
         temp.add(ABooleanConstructorDescriptor.FACTORY);
         temp.add(ANullConstructorDescriptor.FACTORY);
@@ -518,7 +540,7 @@
         typeInference(expr, fd, context);
         return fd;
     }
-
+    
     private void typeInference(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context)
             throws AlgebricksException {
         if (fd.getIdentifier().equals(AsterixBuiltinFunctions.LISTIFY)) {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
index a66f775..ef89e9f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
@@ -55,7 +55,7 @@
         return new ScanCollectionUnnestingFunctionFactory(args[0]);
     }
 
-    private static class ScanCollectionUnnestingFunctionFactory implements ICopyUnnestingFunctionFactory {
+    public static class ScanCollectionUnnestingFunctionFactory implements ICopyUnnestingFunctionFactory {
 
         private static final long serialVersionUID = 1L;
         private ICopyEvaluatorFactory listEvalFactory;