Added local min and max aggregate functions. Changed min and max aggregate functions to handle system null properly. Enabled tests.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_fix_agg@529 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/resources/runtimets/ignore.txt b/asterix-app/src/test/resources/runtimets/ignore.txt
index 82051f9..43701fa 100644
--- a/asterix-app/src/test/resources/runtimets/ignore.txt
+++ b/asterix-app/src/test/resources/runtimets/ignore.txt
@@ -38,7 +38,3 @@
 quantifiers/everysat_02.aql
 quantifiers/everysat_03.aql
 aggregate/avg_empty_02.aql
-aggregate/min_empty_01.aql
-aggregate/min_empty_02.aql
-aggregate/max_empty_01.aql
-aggregate/max_empty_02.aql
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index 8669ef0..2af5d9d 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -236,7 +236,9 @@
     public final static FunctionIdentifier SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-sum", 1);
     public final static FunctionIdentifier LOCAL_SUM = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-sum", 1);
     public final static FunctionIdentifier MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-max", 1);
+    public final static FunctionIdentifier LOCAL_MAX = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-max", 1);
     public final static FunctionIdentifier MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-min", 1);
+    public final static FunctionIdentifier LOCAL_MIN = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-min", 1);
     public final static FunctionIdentifier GLOBAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "agg-global-avg", 1);
     public final static FunctionIdentifier LOCAL_AVG = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -517,7 +519,9 @@
         add(MAKE_FIELD_INDEX_HANDLE, null); // TODO
         add(MAKE_FIELD_NAME_HANDLE, null); // TODO
         add(MAX, NonTaggedSumTypeComputer.INSTANCE);
+        add(LOCAL_MAX, NonTaggedSumTypeComputer.INSTANCE);
         add(MIN, NonTaggedSumTypeComputer.INSTANCE);
+        add(LOCAL_MIN, NonTaggedSumTypeComputer.INSTANCE);
         add(NON_EMPTY_STREAM, ABooleanTypeComputer.INSTANCE);
         add(NULL_CONSTRUCTOR, ANullTypeComputer.INSTANCE);
         add(NUMERIC_UNARY_MINUS, NonTaggedUnaryMinusTypeComputer.INSTANCE);
@@ -676,14 +680,16 @@
         addGlobalAgg(COUNT, SUM);
 
         addAgg(MAX);
-        addLocalAgg(MAX, MAX);
+        addAgg(LOCAL_MAX);
+        addLocalAgg(MAX, LOCAL_MAX);
         addGlobalAgg(MAX, MAX);
 
         addAgg(MIN);
-        addLocalAgg(MIN, MIN);
+        addLocalAgg(MIN, LOCAL_MIN);
         addGlobalAgg(MIN, MIN);
 
         addAgg(SUM);
+        addAgg(LOCAL_SUM);
         addLocalAgg(SUM, LOCAL_SUM);
         addGlobalAgg(SUM, SUM);
 
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
index 9831c28..ef5f6d1 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -57,7 +57,6 @@
             state.writeBoolean(false);
             state.writeBoolean(false);
             state.writeBoolean(false);
-            state.writeBoolean(false);
             state.writeDouble(0.0);
         } catch (IOException e) {
             throw new AlgebricksException(e);
@@ -75,7 +74,6 @@
         boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
         boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
         boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
-        boolean metSystemNull = BufferSerDeUtil.getBoolean(state, pos++);
         double sum = BufferSerDeUtil.getDouble(state, pos);
 
         inputVal.reset();
@@ -125,7 +123,6 @@
                     break;
                 }
                 case SYSTEM_NULL: {
-                    metSystemNull = true;
                     // For global aggregates simply ignore system null here,
                     // but if all input value are system null, then we should return
                     // null in finish().
@@ -149,7 +146,6 @@
         BufferSerDeUtil.writeBoolean(metFloats, state, pos++);
         BufferSerDeUtil.writeBoolean(metDoubles, state, pos++);
         BufferSerDeUtil.writeBoolean(metNull, state, pos++);
-        BufferSerDeUtil.writeBoolean(metSystemNull, state, pos++);
         BufferSerDeUtil.writeDouble(sum, state, pos);
     }
 
@@ -164,7 +160,6 @@
         boolean metFloats = BufferSerDeUtil.getBoolean(state, pos++);
         boolean metDoubles = BufferSerDeUtil.getBoolean(state, pos++);
         boolean metNull = BufferSerDeUtil.getBoolean(state, pos++);
-        boolean metSystemNull = BufferSerDeUtil.getBoolean(state, pos++);
         double sum = BufferSerDeUtil.getDouble(state, pos);
         try {
             if (metNull) {
@@ -201,10 +196,6 @@
                         .getSerializerDeserializer(BuiltinType.AINT8);
                 aInt8.setValue((byte) sum);
                 serde.serialize(aInt8, out);
-            } else if (metSystemNull) {
-                // All input values must have been of type system null.
-                serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                serde.serialize(ANull.NULL, out);
             } else {
                 // Empty stream. For local agg return system null. For global agg return null.
                 if (isLocalAgg) {
@@ -214,7 +205,6 @@
                     serde.serialize(ANull.NULL, out);
                 }
             }
-
         } catch (IOException e) {
             throw new AlgebricksException(e);
         }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java
new file mode 100644
index 0000000..11d83b6
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMaxAggregateDescriptor.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+
+public class LocalMaxAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-max",
+            1);
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalMaxAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new MaxAggregateFunction(args, provider, true);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
new file mode 100644
index 0000000..2fc219f
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/LocalMinAggregateDescriptor.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import edu.uci.ics.asterix.common.functions.FunctionConstants;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
+import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
+import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+
+public class LocalMinAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "agg-local-min",
+            1);
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new LocalMinAggregateDescriptor();
+        }
+    };
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return FID;
+    }
+
+    @Override
+    public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new ICopyAggregateFunctionFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+                    throws AlgebricksException {
+                return new MinAggregateFunction(args, provider, true);
+            }
+        };
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
index e409910..a672b19 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateDescriptor.java
@@ -1,39 +1,15 @@
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class MaxAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
 
@@ -50,7 +26,6 @@
         return FID;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
             throws AlgebricksException {
@@ -60,152 +35,7 @@
             @Override
             public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
                     throws AlgebricksException {
-
-                return new ICopyAggregateFunction() {
-
-                    private DataOutput out = provider.getDataOutput();
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-                    private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
-
-                    private short shortVal = Short.MIN_VALUE;
-                    private int intVal = Integer.MIN_VALUE;
-                    private long longVal = Long.MIN_VALUE;
-                    private float floatVal = Float.MIN_VALUE;
-                    private double doubleVal = Double.MIN_VALUE;
-
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableFloat aFloat = new AMutableFloat(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-                    private AMutableInt32 aInt32 = new AMutableInt32(0);
-                    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-                    @SuppressWarnings("rawtypes")
-                    private ISerializerDeserializer serde;
-
-                    @Override
-                    public void init() {
-                        shortVal = Short.MIN_VALUE;
-                        intVal = Integer.MIN_VALUE;
-                        longVal = Long.MIN_VALUE;
-                        floatVal = Float.MIN_VALUE;
-                        doubleVal = Double.MIN_VALUE;
-
-                        metInt8s = false;
-                        metInt16s = false;
-                        metInt32s = false;
-                        metInt64s = false;
-                        metFloats = false;
-                        metDoubles = false;
-                        metNull = false;
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        if (inputVal.getLength() > 0) {
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
-                                    .getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    metInt8s = true;
-                                    throw new NotImplementedException("no implementation for int8's comparator");
-                                }
-                                case INT16: {
-                                    metInt16s = true;
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    if (val > shortVal)
-                                        shortVal = val;
-                                    throw new NotImplementedException("no implementation for int16's comparator");
-                                }
-                                case INT32: {
-                                    metInt32s = true;
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    if (val > intVal)
-                                        intVal = val;
-                                    break;
-                                }
-                                case INT64: {
-                                    metInt64s = true;
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    if (val > longVal)
-                                        longVal = val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    metFloats = true;
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    if (val > floatVal)
-                                        floatVal = val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    metDoubles = true;
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    if (val > doubleVal)
-                                        doubleVal = val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute SUM for values of type "
-                                            + typeTag);
-                                }
-                            }
-                        }
-                    }
-
-                    @Override
-                    public void finish() throws AlgebricksException {
-                        try {
-                            if (metNull) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.ANULL);
-                                serde.serialize(ANull.NULL, out);
-                            } else if (metDoubles) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                                aDouble.setValue(doubleVal);
-                                serde.serialize(aDouble, out);
-                            } else if (metFloats) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AFLOAT);
-                                aFloat.setValue(floatVal);
-                                serde.serialize(aFloat, out);
-                            } else if (metInt64s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT64);
-                                aInt64.setValue(longVal);
-                                serde.serialize(aInt64, out);
-                            } else if (metInt32s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT32);
-                                aInt32.setValue(intVal);
-                                serde.serialize(aInt32, out);
-                            } else if (metInt16s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT16);
-                                aInt16.setValue(shortVal);
-                                serde.serialize(aInt16, out);
-                            } else if (metInt8s) {
-                                throw new NotImplementedException("no implementation for int8's comparator");
-                            } else {
-                                GlobalConfig.ASTERIX_LOGGER.fine("SUM aggregate ran over empty input.");
-                            }
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-
-                    }
-
-                    @Override
-                    public void finishPartial() throws AlgebricksException {
-                        finish();
-                    }
-                };
+                return new MaxAggregateFunction(args, provider, false);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateFunction.java
new file mode 100644
index 0000000..3f86c80
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MaxAggregateFunction.java
@@ -0,0 +1,198 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class MaxAggregateFunction implements ICopyAggregateFunction {
+    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+    private DataOutput out;    
+    private ICopyEvaluator eval;
+    private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
+
+    private short shortVal = Short.MIN_VALUE;
+    private int intVal = Integer.MIN_VALUE;
+    private long longVal = Long.MIN_VALUE;
+    private float floatVal = Float.MIN_VALUE;
+    private double doubleVal = Double.MIN_VALUE;
+
+    private AMutableDouble aDouble = new AMutableDouble(0);
+    private AMutableFloat aFloat = new AMutableFloat(0);
+    private AMutableInt64 aInt64 = new AMutableInt64(0);
+    private AMutableInt32 aInt32 = new AMutableInt32(0);
+    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+    @SuppressWarnings("rawtypes")
+    private ISerializerDeserializer serde;
+    private final boolean isLocalAgg;
+    
+    public MaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
+            throws AlgebricksException {
+        out = provider.getDataOutput();
+        eval = args[0].createEvaluator(inputVal);
+        this.isLocalAgg = isLocalAgg;
+    }
+    
+    @Override
+    public void init() {
+        shortVal = Short.MIN_VALUE;
+        intVal = Integer.MIN_VALUE;
+        longVal = Long.MIN_VALUE;
+        floatVal = Float.MIN_VALUE;
+        doubleVal = Double.MIN_VALUE;
+
+        metInt8s = false;
+        metInt16s = false;
+        metInt32s = false;
+        metInt64s = false;
+        metFloats = false;
+        metDoubles = false;
+        metNull = false;
+    }
+
+    @Override
+    public void step(IFrameTupleReference tuple) throws AlgebricksException {
+        inputVal.reset();
+        eval.evaluate(tuple);
+        if (inputVal.getLength() > 0) {
+            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
+                    .getByteArray()[0]);
+            switch (typeTag) {
+                case INT8: {
+                    metInt8s = true;
+                    throw new NotImplementedException("no implementation for int8's comparator");
+                }
+                case INT16: {
+                    metInt16s = true;
+                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+                    if (val > shortVal)
+                        shortVal = val;
+                    throw new NotImplementedException("no implementation for int16's comparator");
+                }
+                case INT32: {
+                    metInt32s = true;
+                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+                    if (val > intVal)
+                        intVal = val;
+                    break;
+                }
+                case INT64: {
+                    metInt64s = true;
+                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+                    if (val > longVal)
+                        longVal = val;
+                    break;
+                }
+                case FLOAT: {
+                    metFloats = true;
+                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+                    if (val > floatVal)
+                        floatVal = val;
+                    break;
+                }
+                case DOUBLE: {
+                    metDoubles = true;
+                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+                    if (val > doubleVal)
+                        doubleVal = val;
+                    break;
+                }
+                case NULL: {
+                    metNull = true;
+                    break;
+                }
+                case SYSTEM_NULL: {
+                    // For global aggregates simply ignore system null here,
+                    // but if all input value are system null, then we should return
+                    // null in finish().
+                    if (isLocalAgg) {
+                        throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+                    }
+                    break;
+                }
+                default: {
+                    throw new NotImplementedException("Cannot compute SUM for values of type "
+                            + typeTag);
+                }
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void finish() throws AlgebricksException {
+        try {
+            if (metNull) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(BuiltinType.ANULL);
+                serde.serialize(ANull.NULL, out);
+            } else if (metDoubles) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(BuiltinType.ADOUBLE);
+                aDouble.setValue(doubleVal);
+                serde.serialize(aDouble, out);
+            } else if (metFloats) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(BuiltinType.AFLOAT);
+                aFloat.setValue(floatVal);
+                serde.serialize(aFloat, out);
+            } else if (metInt64s) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(BuiltinType.AINT64);
+                aInt64.setValue(longVal);
+                serde.serialize(aInt64, out);
+            } else if (metInt32s) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(BuiltinType.AINT32);
+                aInt32.setValue(intVal);
+                serde.serialize(aInt32, out);
+            } else if (metInt16s) {
+                serde = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(BuiltinType.AINT16);
+                aInt16.setValue(shortVal);
+                serde.serialize(aInt16, out);
+            } else if (metInt8s) {
+                throw new NotImplementedException("no implementation for int8's comparator");
+            } else {
+                // Empty stream. For local agg return system null. For global agg return null.
+                if (isLocalAgg) {
+                    out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                } else {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                    serde.serialize(ANull.NULL, out);
+                }
+            }
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
+    public void finishPartial() throws AlgebricksException {
+        finish();
+    }
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
index af54c14..0d2b68b 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateDescriptor.java
@@ -1,39 +1,15 @@
 package edu.uci.ics.asterix.runtime.aggregates.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.AMutableDouble;
-import edu.uci.ics.asterix.om.base.AMutableFloat;
-import edu.uci.ics.asterix.om.base.AMutableInt16;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt64;
-import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
 import edu.uci.ics.asterix.om.functions.IFunctionDescriptorFactory;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.EnumDeserializer;
 import edu.uci.ics.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class MinAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
 
@@ -50,7 +26,6 @@
         return FID;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
             throws AlgebricksException {
@@ -60,154 +35,8 @@
             @Override
             public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
                     throws AlgebricksException {
-
-                return new ICopyAggregateFunction() {
-
-                    private DataOutput out = provider.getDataOutput();
-                    private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(inputVal);
-                    private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
-
-                    private short shortVal = Short.MAX_VALUE;
-                    private int intVal = Integer.MAX_VALUE;
-                    private long longVal = Long.MAX_VALUE;
-                    private float floatVal = Float.MAX_VALUE;
-                    private double doubleVal = Double.MAX_VALUE;
-
-                    private AMutableDouble aDouble = new AMutableDouble(0);
-                    private AMutableFloat aFloat = new AMutableFloat(0);
-                    private AMutableInt64 aInt64 = new AMutableInt64(0);
-                    private AMutableInt32 aInt32 = new AMutableInt32(0);
-                    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-                    @SuppressWarnings("rawtypes")
-                    private ISerializerDeserializer serde;
-
-                    @Override
-                    public void init() {
-                        shortVal = Short.MAX_VALUE;
-                        intVal = Integer.MAX_VALUE;
-                        longVal = Long.MAX_VALUE;
-                        floatVal = Float.MAX_VALUE;
-                        doubleVal = Double.MAX_VALUE;
-
-                        metInt8s = false;
-                        metInt16s = false;
-                        metInt32s = false;
-                        metInt64s = false;
-                        metFloats = false;
-                        metDoubles = false;
-                        metNull = false;
-                    }
-
-                    @Override
-                    public void step(IFrameTupleReference tuple) throws AlgebricksException {
-                        inputVal.reset();
-                        eval.evaluate(tuple);
-                        if (inputVal.getLength() > 0) {
-                            ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal
-                                    .getByteArray()[0]);
-                            switch (typeTag) {
-                                case INT8: {
-                                    metInt8s = true;
-                                    throw new NotImplementedException("no implementation for int8's comparator");
-                                }
-                                case INT16: {
-                                    metInt16s = true;
-                                    short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
-                                    if (val < shortVal)
-                                        shortVal = val;
-                                    throw new NotImplementedException("no implementation for int16's comparator");
-                                }
-                                case INT32: {
-                                    metInt32s = true;
-                                    int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
-                                    if (val < intVal)
-                                        intVal = val;
-                                    break;
-                                }
-                                case INT64: {
-                                    metInt64s = true;
-                                    long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
-                                    if (val < longVal)
-                                        longVal = val;
-                                    break;
-                                }
-                                case FLOAT: {
-                                    metFloats = true;
-                                    float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
-                                    if (val < floatVal)
-                                        floatVal = val;
-                                    break;
-                                }
-                                case DOUBLE: {
-                                    metDoubles = true;
-                                    double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
-                                    if (val < doubleVal)
-                                        doubleVal = val;
-                                    break;
-                                }
-                                case NULL: {
-                                    metNull = true;
-                                    break;
-                                }
-                                default: {
-                                    throw new NotImplementedException("Cannot compute SUM for values of type "
-                                            + typeTag);
-                                }
-                            }
-                        }
-                    }
-
-                    @Override
-                    public void finish() throws AlgebricksException {
-                        try {
-                            if (metNull) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.ANULL);
-                                serde.serialize(ANull.NULL, out);
-                            } else if (metDoubles) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.ADOUBLE);
-                                aDouble.setValue(doubleVal);
-                                serde.serialize(aDouble, out);
-                            } else if (metFloats) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AFLOAT);
-                                aFloat.setValue(floatVal);
-                                serde.serialize(aFloat, out);
-                            } else if (metInt64s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT64);
-                                aInt64.setValue(longVal);
-                                serde.serialize(aInt64, out);
-                            } else if (metInt32s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT32);
-                                aInt32.setValue(intVal);
-                                serde.serialize(aInt32, out);
-                            } else if (metInt16s) {
-                                serde = AqlSerializerDeserializerProvider.INSTANCE
-                                        .getSerializerDeserializer(BuiltinType.AINT16);
-                                aInt16.setValue(shortVal);
-                                serde.serialize(aInt16, out);
-                            } else if (metInt8s) {
-                                throw new NotImplementedException("no implementation for int8's comparator");
-                            } else {
-                                GlobalConfig.ASTERIX_LOGGER.fine("SUM aggregate ran over empty input.");
-                            }
-                        } catch (IOException e) {
-                            throw new AlgebricksException(e);
-                        }
-
-                    }
-
-                    @Override
-                    public void finishPartial() throws AlgebricksException {
-                        finish();
-                    }
-                };
+                return new MinAggregateFunction(args, provider, false);
             }
         };
     }
-
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateFunction.java
new file mode 100644
index 0000000..0fd2355
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/MinAggregateFunction.java
@@ -0,0 +1,205 @@
+package edu.uci.ics.asterix.runtime.aggregates.std;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IDataOutputProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class MinAggregateFunction implements ICopyAggregateFunction {	
+	private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+	private DataOutput out;
+	private ICopyEvaluator eval;
+	private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats,
+			metDoubles, metNull;
+	private short shortVal = Short.MAX_VALUE;
+	private int intVal = Integer.MAX_VALUE;
+	private long longVal = Long.MAX_VALUE;
+	private float floatVal = Float.MAX_VALUE;
+	private double doubleVal = Double.MAX_VALUE;
+
+	private AMutableDouble aDouble = new AMutableDouble(0);
+	private AMutableFloat aFloat = new AMutableFloat(0);
+	private AMutableInt64 aInt64 = new AMutableInt64(0);
+	private AMutableInt32 aInt32 = new AMutableInt32(0);
+	private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
+	@SuppressWarnings("rawtypes")
+	private ISerializerDeserializer serde;
+	private final boolean isLocalAgg;
+	
+    public MinAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isLocalAgg)
+            throws AlgebricksException {
+        out = provider.getDataOutput();
+        eval = args[0].createEvaluator(inputVal);
+        this.isLocalAgg = isLocalAgg;
+    }
+	
+	@Override
+	public void init() {
+		shortVal = Short.MAX_VALUE;
+		intVal = Integer.MAX_VALUE;
+		longVal = Long.MAX_VALUE;
+		floatVal = Float.MAX_VALUE;
+		doubleVal = Double.MAX_VALUE;
+
+		metInt8s = false;
+		metInt16s = false;
+		metInt32s = false;
+		metInt64s = false;
+		metFloats = false;
+		metDoubles = false;
+		metNull = false;
+	}
+
+	@Override
+	public void step(IFrameTupleReference tuple) throws AlgebricksException {
+		inputVal.reset();
+		eval.evaluate(tuple);
+		if (inputVal.getLength() > 0) {
+			ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+					.deserialize(inputVal.getByteArray()[0]);
+			switch (typeTag) {
+			case INT8: {
+				metInt8s = true;
+				throw new NotImplementedException(
+						"no implementation for int8's comparator");
+			}
+			case INT16: {
+				metInt16s = true;
+				short val = AInt16SerializerDeserializer.getShort(
+						inputVal.getByteArray(), 1);
+				if (val < shortVal)
+					shortVal = val;
+				throw new NotImplementedException(
+						"no implementation for int16's comparator");
+			}
+			case INT32: {
+				metInt32s = true;
+				int val = AInt32SerializerDeserializer.getInt(
+						inputVal.getByteArray(), 1);
+				if (val < intVal)
+					intVal = val;
+				break;
+			}
+			case INT64: {
+				metInt64s = true;
+				long val = AInt64SerializerDeserializer.getLong(
+						inputVal.getByteArray(), 1);
+				if (val < longVal)
+					longVal = val;
+				break;
+			}
+			case FLOAT: {
+				metFloats = true;
+				float val = AFloatSerializerDeserializer.getFloat(
+						inputVal.getByteArray(), 1);
+				if (val < floatVal)
+					floatVal = val;
+				break;
+			}
+			case DOUBLE: {
+				metDoubles = true;
+				double val = ADoubleSerializerDeserializer.getDouble(
+						inputVal.getByteArray(), 1);
+				if (val < doubleVal)
+					doubleVal = val;
+				break;
+			}
+			case NULL: {
+				metNull = true;
+				break;
+			}
+			case SYSTEM_NULL: {
+                // For global aggregates simply ignore system null here,
+                // but if all input value are system null, then we should return
+                // null in finish().
+                if (isLocalAgg) {
+                    throw new AlgebricksException("Type SYSTEM_NULL encountered in local aggregate.");
+                }
+                break;
+			}
+			default: {
+				throw new NotImplementedException(
+						"Cannot compute SUM for values of type " + typeTag);
+			}
+			}
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+    @Override
+	public void finish() throws AlgebricksException {
+		try {
+			if (metNull) {
+				serde = AqlSerializerDeserializerProvider.INSTANCE
+						.getSerializerDeserializer(BuiltinType.ANULL);
+				serde.serialize(ANull.NULL, out);
+			} else if (metDoubles) {
+				serde = AqlSerializerDeserializerProvider.INSTANCE
+						.getSerializerDeserializer(BuiltinType.ADOUBLE);
+				aDouble.setValue(doubleVal);
+				serde.serialize(aDouble, out);
+			} else if (metFloats) {
+				serde = AqlSerializerDeserializerProvider.INSTANCE
+						.getSerializerDeserializer(BuiltinType.AFLOAT);
+				aFloat.setValue(floatVal);
+				serde.serialize(aFloat, out);
+			} else if (metInt64s) {
+				serde = AqlSerializerDeserializerProvider.INSTANCE
+						.getSerializerDeserializer(BuiltinType.AINT64);
+				aInt64.setValue(longVal);
+				serde.serialize(aInt64, out);
+			} else if (metInt32s) {
+				serde = AqlSerializerDeserializerProvider.INSTANCE
+						.getSerializerDeserializer(BuiltinType.AINT32);
+				aInt32.setValue(intVal);
+				serde.serialize(aInt32, out);
+			} else if (metInt16s) {
+				serde = AqlSerializerDeserializerProvider.INSTANCE
+						.getSerializerDeserializer(BuiltinType.AINT16);
+				aInt16.setValue(shortVal);
+				serde.serialize(aInt16, out);
+			} else if (metInt8s) {
+				throw new NotImplementedException(
+						"no implementation for int8's comparator");
+			} else {
+                // Empty stream. For local agg return system null. For global agg return null.
+                if (isLocalAgg) {
+                    out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
+                } else {
+                    serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                    serde.serialize(ANull.NULL, out);
+                }
+            }
+		} catch (IOException e) {
+			throw new AlgebricksException(e);
+		}
+	}
+
+	@Override
+	public void finishPartial() throws AlgebricksException {
+		finish();
+	}
+}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
index 282d7b3..de33431 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/std/SumAggregateFunction.java
@@ -34,7 +34,7 @@
     private DataOutput out;
     private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
     private ICopyEvaluator eval;
-    private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull, metSystemNull;
+    private boolean metInt8s, metInt16s, metInt32s, metInt64s, metFloats, metDoubles, metNull;
     private double sum;
     private AMutableDouble aDouble = new AMutableDouble(0);
     private AMutableFloat aFloat = new AMutableFloat(0);
@@ -63,7 +63,6 @@
         metFloats = false;
         metDoubles = false;
         metNull = false;
-        metSystemNull = false;
         sum = 0.0;
     }
 
@@ -115,7 +114,6 @@
                     break;
                 }
                 case SYSTEM_NULL: {
-                    metSystemNull = true;
                     // For global aggregates simply ignore system null here,
                     // but if all input value are system null, then we should return
                     // null in finish().
@@ -162,10 +160,6 @@
                 serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
                 aInt8.setValue((byte) sum);
                 serde.serialize(aInt8, out);
-            } else if (metSystemNull) {
-                // All input values must have been of type system null.
-                serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                serde.serialize(ANull.NULL, out);
             } else {
                 // Empty stream. For local agg return system null. For global agg return null.
                 if (isLocalAgg) {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index e17a4dd..001cf1b 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -51,6 +51,8 @@
 import edu.uci.ics.asterix.runtime.aggregates.std.CountAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.GlobalAvgAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalAvgAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalMaxAggregateDescriptor;
+import edu.uci.ics.asterix.runtime.aggregates.std.LocalMinAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.LocalSumAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.MaxAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.aggregates.std.MinAggregateDescriptor;
@@ -82,6 +84,7 @@
 import edu.uci.ics.asterix.runtime.evaluators.functions.AnyCollectionMemberDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.CastRecordDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.ClosedRecordConstructorDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.CodePointToStringDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.ContainsDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.CountHashedGramTokensDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.CountHashedWordTokensDescriptor;
@@ -109,10 +112,16 @@
 import edu.uci.ics.asterix.runtime.evaluators.functions.LenDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.LikeDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.NotDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericAbsDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.NumericAddDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericCeilingDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.NumericDivideDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericFloorDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.NumericModuloDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.NumericMultiplyDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundHalfToEven2Descriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundHalfToEvenDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.NumericSubtractDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.NumericUnaryMinusDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.OpenRecordConstructorDescriptor;
@@ -131,33 +140,26 @@
 import edu.uci.ics.asterix.runtime.evaluators.functions.SpatialDistanceDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.SpatialIntersectDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.StartsWithDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringConcatDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringEndWithDescrtiptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringEqualDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringJoinDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringLengthDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringLowerCaseDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringMatchesDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringMatchesWithFlagDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringReplaceDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringReplaceWithFlagsDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringStartWithDescrtiptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.StringToCodePointDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.Substring2Descriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringAfterDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringBeforeDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.SwitchCaseDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.UnorderedListConstructorDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.WordTokensDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.YearDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericAbsDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericCeilingDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericFloorDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundHalfToEvenDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.NumericRoundHalfToEven2Descriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringEqualDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringStartWithDescrtiptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringEndWithDescrtiptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringMatchesDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringLowerCaseDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringMatchesWithFlagDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringReplaceDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringReplaceWithFlagsDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringLengthDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.Substring2Descriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringBeforeDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.SubstringAfterDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringToCodePointDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.CodePointToStringDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringConcatDescriptor;
-import edu.uci.ics.asterix.runtime.evaluators.functions.StringJoinDescriptor;
 import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
 import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
 import edu.uci.ics.asterix.runtime.runningaggregates.std.TidRunningAggregateDescriptor;
@@ -304,7 +306,9 @@
         temp.add(SumAggregateDescriptor.FACTORY);
         temp.add(LocalSumAggregateDescriptor.FACTORY);
         temp.add(MaxAggregateDescriptor.FACTORY);
+        temp.add(LocalMaxAggregateDescriptor.FACTORY);
         temp.add(MinAggregateDescriptor.FACTORY);
+        temp.add(LocalMinAggregateDescriptor.FACTORY);
 
         // serializable aggregates
         temp.add(SerializableCountAggregateDescriptor.FACTORY);