[FUN] Use instance comparators for array functions

- user model changes: no
- storage format changes: no
- interface changes: no

details:
This patch is to make the array functions use instance
comparators instead of a static comparator to avoid
issues caused by parallelism.

Change-Id: Ic03ef8dc1d678d61999917f975bf9d7f301f873f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2816
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java
index b74ec6a..4ac1f8dc 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java
@@ -28,6 +28,7 @@
 import org.apache.asterix.builders.IAsterixListBuilder;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
 import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
@@ -44,6 +45,7 @@
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
@@ -94,6 +96,7 @@
         private final SourceLocation sourceLoc;
         private final IBinaryHashFunction binaryHashFunction;
         private final Int2ObjectMap<List<IPointable>> hashes;
+        private final IBinaryComparator comp;
         private IPointable item;
         private ArrayBackedValueStorage storage;
 
@@ -102,6 +105,7 @@
             super(args, ctx, inputListType);
             this.sourceLoc = sourceLoc;
             hashes = new Int2ObjectOpenHashMap<>();
+            comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
             item = pointableAllocator.allocateEmpty();
             storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
             binaryHashFunction = BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(null)
@@ -138,7 +142,7 @@
                         addItem(item, listBuilder, itemInStorage, sameHashes);
                         hashes.put(hash, sameHashes);
                         item = pointableAllocator.allocateEmpty();
-                    } else if (ArrayFunctionsUtil.findItem(item, sameHashes) == null) {
+                    } else if (ArrayFunctionsUtil.findItem(item, sameHashes, comp) == null) {
                         // new item, it could happen that two hashes are the same but they are for different items
                         addItem(item, listBuilder, itemInStorage, sameHashes);
                         item = pointableAllocator.allocateEmpty();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
index 1540ecb..706cb53 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
@@ -33,6 +33,7 @@
 import org.apache.asterix.builders.UnorderedListBuilder;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
 import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
 import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
@@ -56,6 +57,7 @@
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IMutableValueStorage;
@@ -140,6 +142,7 @@
         private final IObjectPool<List<ValueListIndex>, ATypeTag> arrayListAllocator;
         private final ArrayBackedValueStorage finalResult;
         private final CastTypeEvaluator caster;
+        private final IBinaryComparator comp;
         private IAsterixListBuilder orderedListBuilder;
         private IAsterixListBuilder unorderedListBuilder;
 
@@ -153,6 +156,7 @@
             finalResult = new ArrayBackedValueStorage();
             listAccessor = new ListAccessor();
             caster = new CastTypeEvaluator();
+            comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
             listsArgs = new IPointable[args.length];
             listsEval = new IScalarEvaluator[args.length];
             for (int i = 0; i < args.length; i++) {
@@ -303,7 +307,7 @@
                 newHashes.add(new ValueListIndex(item, -1));
                 hashes.put(hash, newHashes);
                 return true;
-            } else if (ArrayFunctionsUtil.findItem(item, sameHashes) == null) {
+            } else if (ArrayFunctionsUtil.findItem(item, sameHashes, comp) == null) {
                 sameHashes.add(new ValueListIndex(item, -1));
                 return true;
             }
@@ -341,7 +345,7 @@
 
         private void incrementIfExists(List<ValueListIndex> sameHashes, IPointable item, int listIndex,
                 IAsterixListBuilder listBuilder) throws HyracksDataException {
-            ValueListIndex sameValue = ArrayFunctionsUtil.findItem(item, sameHashes);
+            ValueListIndex sameValue = ArrayFunctionsUtil.findItem(item, sameHashes, comp);
             if (sameValue != null && listIndex - sameValue.listIndex == 1) {
                 // found the item, its stamp is OK (stamp saves the last list index that has seen this item)
                 // increment stamp of this item
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySortDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySortDescriptor.java
index f99fc4c..12ae2fd 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySortDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySortDescriptor.java
@@ -49,7 +49,6 @@
 
 public class ArraySortDescriptor extends AbstractScalarFunctionDynamicDescriptor {
     private static final long serialVersionUID = 1L;
-    private static final ArraySortComparator COMP = new ArraySortComparator();
     private IAType inputListType;
 
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@@ -83,12 +82,12 @@
 
             @Override
             public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException {
-                return new ArraySortFunction(args, ctx, sourceLoc);
+                return new ArraySortEval(args, ctx, sourceLoc);
             }
         };
     }
 
-    private static class ArraySortComparator implements Comparator<IPointable> {
+    protected class ArraySortComparator implements Comparator<IPointable> {
         private final IBinaryComparator comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
 
         @Override
@@ -102,19 +101,19 @@
         }
     }
 
-    public class ArraySortFunction extends AbstractArrayProcessEval {
+    public class ArraySortEval extends AbstractArrayProcessEval {
         private final SourceLocation sourceLoc;
         private final PriorityQueue<IPointable> sortedList;
         private IPointable item;
         private ArrayBackedValueStorage storage;
 
-        public ArraySortFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx, SourceLocation sourceLoc)
+        public ArraySortEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx, SourceLocation sourceLoc)
                 throws HyracksDataException {
             super(args, ctx, inputListType);
             this.sourceLoc = sourceLoc;
             item = pointableAllocator.allocateEmpty();
             storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
-            sortedList = new PriorityQueue<>(COMP);
+            sortedList = new PriorityQueue<>(new ArraySortComparator());
         }
 
         @Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySymDiffEval.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySymDiffEval.java
index 4ff9c30..45b9fdc 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySymDiffEval.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySymDiffEval.java
@@ -24,6 +24,7 @@
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import org.apache.asterix.builders.ArrayListFactory;
 import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
 import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
@@ -33,6 +34,7 @@
 import org.apache.asterix.runtime.utils.ArrayFunctionsUtil;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
@@ -44,6 +46,7 @@
     private final Int2ObjectMap<List<ValueCounter>> hashes;
     private final IObjectPool<List<ValueCounter>, ATypeTag> arrayListAllocator;
     private final IObjectPool<ValueCounter, ATypeTag> valueCounterAllocator;
+    private final IBinaryComparator comp;
 
     public ArraySymDiffEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx, SourceLocation sourceLocation,
             IAType[] argTypes) throws HyracksDataException {
@@ -51,6 +54,7 @@
         arrayListAllocator = new ListObjectPool<>(new ArrayListFactory<>());
         valueCounterAllocator = new ListObjectPool<>(new ValueCounterFactory());
         hashes = new Int2ObjectOpenHashMap<>();
+        comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
         binaryHashFunction = BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(null)
                 .createBinaryHashFunction();
     }
@@ -136,7 +140,7 @@
             return true;
         } else {
             // potentially, item already exists
-            ValueCounter itemListIdxCounter = ArrayFunctionsUtil.findItem(item, sameHashes);
+            ValueCounter itemListIdxCounter = ArrayFunctionsUtil.findItem(item, sameHashes, comp);
             if (itemListIdxCounter == null) {
                 // new item
                 addItem(item, listIndex, sameHashes);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayUnionDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayUnionDescriptor.java
index ccb86c1..0a0a6ef 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayUnionDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayUnionDescriptor.java
@@ -24,6 +24,7 @@
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import org.apache.asterix.builders.ArrayListFactory;
 import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
 import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
@@ -41,6 +42,7 @@
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
@@ -106,11 +108,13 @@
         private final IObjectPool<List<IPointable>, ATypeTag> pointableListAllocator;
         private final IBinaryHashFunction binaryHashFunction;
         private final Int2ObjectMap<List<IPointable>> hashes;
+        private final IBinaryComparator comp;
 
         public ArrayUnionEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException {
             super(args, ctx, true, sourceLoc, argTypes);
             pointableListAllocator = new ListObjectPool<>(new ArrayListFactory<>());
             hashes = new Int2ObjectOpenHashMap<>();
+            comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
             binaryHashFunction = BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(null)
                     .createBinaryHashFunction();
         }
@@ -142,7 +146,7 @@
                 addItem(listBuilder, item, sameHashes);
                 hashes.put(hash, sameHashes);
                 return true;
-            } else if (ArrayFunctionsUtil.findItem(item, sameHashes) == null) {
+            } else if (ArrayFunctionsUtil.findItem(item, sameHashes, comp) == null) {
                 // new item, it could happen that two hashes are the same but they are for different items
                 addItem(listBuilder, item, sameHashes);
                 return true;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ArrayFunctionsUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ArrayFunctionsUtil.java
index 37ac692a..14f102c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ArrayFunctionsUtil.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ArrayFunctionsUtil.java
@@ -20,24 +20,21 @@
 
 import java.util.List;
 
-import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
 
 public class ArrayFunctionsUtil {
 
-    private static final IBinaryComparator COMP = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-
     private ArrayFunctionsUtil() {
     }
 
-    public static <T extends IValueReference> T findItem(IValueReference item, List<T> sameHashes)
-            throws HyracksDataException {
+    public static <T extends IValueReference> T findItem(IValueReference item, List<T> sameHashes,
+            IBinaryComparator comp) throws HyracksDataException {
         T sameItem;
         for (int k = 0; k < sameHashes.size(); k++) {
             sameItem = sameHashes.get(k);
-            if (COMP.compare(item.getByteArray(), item.getStartOffset(), item.getLength(), sameItem.getByteArray(),
+            if (comp.compare(item.getByteArray(), item.getStartOffset(), item.getLength(), sameItem.getByteArray(),
                     sameItem.getStartOffset(), sameItem.getLength()) == 0) {
                 return sameItem;
             }