[ASTERIXDB-2564][RT] Too many objects created in min() and max()

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
During min() and max() aggregation, the functions keep track of
the aggregation type in order to handle heterogeneous  lists.
It promotes the aggregation type if needed (e.g. encountered double).
Don't switch to new aggregation type and create a new comparator
when the new input value type is the same as the previously
aggregated values. That is because canPromote(agg_type, new_val_type)
will always return true for same types.

Change-Id: I0bb9f0715985ae555de00bbf3173c80371d8968b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3391
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
index 86ae924..90f006d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
@@ -30,7 +30,6 @@
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.data.std.api.IPointable;
@@ -39,18 +38,16 @@
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public abstract class AbstractMinMaxAggregateFunction extends AbstractAggregateFunction {
-    private IPointable inputVal = new VoidPointable();
-    private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
-    private ArrayBackedValueStorage tempValForCasting = new ArrayBackedValueStorage();
-
-    protected ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
-    private IScalarEvaluator eval;
+    protected final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+    private final IPointable inputVal = new VoidPointable();
+    private final ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
+    private final ArrayBackedValueStorage tempValForCasting = new ArrayBackedValueStorage();
+    private final IScalarEvaluator eval;
+    private final boolean isMin;
     protected ATypeTag aggType;
     private IBinaryComparator cmp;
-    private ITypeConvertComputer tpc;
-    private final boolean isMin;
 
-    public AbstractMinMaxAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, boolean isMin,
+    AbstractMinMaxAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, boolean isMin,
             SourceLocation sourceLoc) throws HyracksDataException {
         super(sourceLoc);
         eval = args[0].createScalarEvaluator(context);
@@ -82,9 +79,8 @@
             // First value encountered. Set type, comparator, and initial value.
             aggType = typeTag;
             // Set comparator.
-            IBinaryComparatorFactory cmpFactory =
-                    BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, isMin);
-            cmp = cmpFactory.createBinaryComparator();
+            cmp = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, isMin)
+                    .createBinaryComparator();
             // Initialize min value.
             outputVal.assign(inputVal);
         } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
@@ -94,56 +90,27 @@
                 throw new IncompatibleTypeException(sourceLoc, "min/max", aggType.serialize(), typeTag.serialize());
             }
         } else {
-
             // If a system_null is encountered locally, it would be an error; otherwise if it is seen
             // by a global aggregator, it is simple ignored.
             if (typeTag == ATypeTag.SYSTEM_NULL) {
                 processSystemNull();
                 return;
             }
-
+            if (aggType == typeTag) {
+                compareAndUpdate(cmp, inputVal, outputVal);
+                return;
+            }
             if (ATypeHierarchy.canPromote(aggType, typeTag)) {
-                tpc = ATypeHierarchy.getTypePromoteComputer(aggType, typeTag);
-                aggType = typeTag;
-                cmp = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, isMin)
+                // switch to new comp & aggregation type (i.e. current min/max is int and new input is double)
+                cmp = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(typeTag, isMin)
                         .createBinaryComparator();
-                if (tpc != null) {
-                    tempValForCasting.reset();
-                    try {
-                        tpc.convertType(outputVal.getByteArray(), outputVal.getStartOffset() + 1,
-                                outputVal.getLength() - 1, tempValForCasting.getDataOutput());
-                    } catch (IOException e) {
-                        throw HyracksDataException.create(e);
-                    }
-                    outputVal.assign(tempValForCasting);
-                }
-                if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
-                        outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
-                    outputVal.assign(inputVal);
-                }
-
+                castValue(ATypeHierarchy.getTypePromoteComputer(aggType, typeTag), outputVal, tempValForCasting);
+                outputVal.assign(tempValForCasting);
+                compareAndUpdate(cmp, inputVal, outputVal);
+                aggType = typeTag;
             } else {
-                tpc = ATypeHierarchy.getTypePromoteComputer(typeTag, aggType);
-                if (tpc != null) {
-                    tempValForCasting.reset();
-                    try {
-                        tpc.convertType(inputVal.getByteArray(), inputVal.getStartOffset() + 1,
-                                inputVal.getLength() - 1, tempValForCasting.getDataOutput());
-                    } catch (IOException e) {
-                        throw HyracksDataException.create(e);
-                    }
-                    if (cmp.compare(tempValForCasting.getByteArray(), tempValForCasting.getStartOffset(),
-                            tempValForCasting.getLength(), outputVal.getByteArray(), outputVal.getStartOffset(),
-                            outputVal.getLength()) < 0) {
-                        outputVal.assign(tempValForCasting);
-                    }
-                } else {
-                    if (cmp.compare(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength(),
-                            outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength()) < 0) {
-                        outputVal.assign(inputVal);
-                    }
-                }
-
+                castValue(ATypeHierarchy.getTypePromoteComputer(typeTag, aggType), inputVal, tempValForCasting);
+                compareAndUpdate(cmp, tempValForCasting, outputVal);
             }
         }
     }
@@ -153,20 +120,17 @@
         resultStorage.reset();
         try {
             switch (aggType) {
-                case NULL: {
+                case NULL:
                     resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
                     result.set(resultStorage);
                     break;
-                }
-                case SYSTEM_NULL: {
+                case SYSTEM_NULL:
                     finishSystemNull();
                     result.set(resultStorage);
                     break;
-                }
-                default: {
+                default:
                     result.set(outputVal);
                     break;
-                }
             }
         } catch (IOException e) {
             throw HyracksDataException.create(e);
@@ -187,4 +151,23 @@
     protected abstract void processSystemNull() throws HyracksDataException;
 
     protected abstract void finishSystemNull() throws IOException;
+
+    private static void compareAndUpdate(IBinaryComparator comp, IPointable newVal, ArrayBackedValueStorage oldVal)
+            throws HyracksDataException {
+        if (comp.compare(newVal.getByteArray(), newVal.getStartOffset(), newVal.getLength(), oldVal.getByteArray(),
+                oldVal.getStartOffset(), oldVal.getLength()) < 0) {
+            oldVal.assign(newVal);
+        }
+    }
+
+    private static void castValue(ITypeConvertComputer typeConverter, IPointable inputValue,
+            ArrayBackedValueStorage tempValForCasting) throws HyracksDataException {
+        tempValForCasting.reset();
+        try {
+            typeConverter.convertType(inputValue.getByteArray(), inputValue.getStartOffset() + 1,
+                    inputValue.getLength() - 1, tempValForCasting.getDataOutput());
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
 }