[FUN] Use instance comparators for array functions
- user model changes: no
- storage format changes: no
- interface changes: no
details:
This patch is to make the array functions use instance
comparators instead of a static comparator to avoid
issues caused by parallelism.
Change-Id: Ic03ef8dc1d678d61999917f975bf9d7f301f873f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2816
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java
index b74ec6a..4ac1f8dc 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java
@@ -28,6 +28,7 @@
import org.apache.asterix.builders.IAsterixListBuilder;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.functions.IFunctionDescriptor;
@@ -44,6 +45,7 @@
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
@@ -94,6 +96,7 @@
private final SourceLocation sourceLoc;
private final IBinaryHashFunction binaryHashFunction;
private final Int2ObjectMap<List<IPointable>> hashes;
+ private final IBinaryComparator comp;
private IPointable item;
private ArrayBackedValueStorage storage;
@@ -102,6 +105,7 @@
super(args, ctx, inputListType);
this.sourceLoc = sourceLoc;
hashes = new Int2ObjectOpenHashMap<>();
+ comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
item = pointableAllocator.allocateEmpty();
storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
binaryHashFunction = BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(null)
@@ -138,7 +142,7 @@
addItem(item, listBuilder, itemInStorage, sameHashes);
hashes.put(hash, sameHashes);
item = pointableAllocator.allocateEmpty();
- } else if (ArrayFunctionsUtil.findItem(item, sameHashes) == null) {
+ } else if (ArrayFunctionsUtil.findItem(item, sameHashes, comp) == null) {
// new item, it could happen that two hashes are the same but they are for different items
addItem(item, listBuilder, itemInStorage, sameHashes);
item = pointableAllocator.allocateEmpty();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
index 1540ecb..706cb53 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
@@ -33,6 +33,7 @@
import org.apache.asterix.builders.UnorderedListBuilder;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
@@ -56,6 +57,7 @@
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
@@ -140,6 +142,7 @@
private final IObjectPool<List<ValueListIndex>, ATypeTag> arrayListAllocator;
private final ArrayBackedValueStorage finalResult;
private final CastTypeEvaluator caster;
+ private final IBinaryComparator comp;
private IAsterixListBuilder orderedListBuilder;
private IAsterixListBuilder unorderedListBuilder;
@@ -153,6 +156,7 @@
finalResult = new ArrayBackedValueStorage();
listAccessor = new ListAccessor();
caster = new CastTypeEvaluator();
+ comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
listsArgs = new IPointable[args.length];
listsEval = new IScalarEvaluator[args.length];
for (int i = 0; i < args.length; i++) {
@@ -303,7 +307,7 @@
newHashes.add(new ValueListIndex(item, -1));
hashes.put(hash, newHashes);
return true;
- } else if (ArrayFunctionsUtil.findItem(item, sameHashes) == null) {
+ } else if (ArrayFunctionsUtil.findItem(item, sameHashes, comp) == null) {
sameHashes.add(new ValueListIndex(item, -1));
return true;
}
@@ -341,7 +345,7 @@
private void incrementIfExists(List<ValueListIndex> sameHashes, IPointable item, int listIndex,
IAsterixListBuilder listBuilder) throws HyracksDataException {
- ValueListIndex sameValue = ArrayFunctionsUtil.findItem(item, sameHashes);
+ ValueListIndex sameValue = ArrayFunctionsUtil.findItem(item, sameHashes, comp);
if (sameValue != null && listIndex - sameValue.listIndex == 1) {
// found the item, its stamp is OK (stamp saves the last list index that has seen this item)
// increment stamp of this item
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySortDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySortDescriptor.java
index f99fc4c..12ae2fd 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySortDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySortDescriptor.java
@@ -49,7 +49,6 @@
public class ArraySortDescriptor extends AbstractScalarFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
- private static final ArraySortComparator COMP = new ArraySortComparator();
private IAType inputListType;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@@ -83,12 +82,12 @@
@Override
public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException {
- return new ArraySortFunction(args, ctx, sourceLoc);
+ return new ArraySortEval(args, ctx, sourceLoc);
}
};
}
- private static class ArraySortComparator implements Comparator<IPointable> {
+ protected class ArraySortComparator implements Comparator<IPointable> {
private final IBinaryComparator comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
@Override
@@ -102,19 +101,19 @@
}
}
- public class ArraySortFunction extends AbstractArrayProcessEval {
+ public class ArraySortEval extends AbstractArrayProcessEval {
private final SourceLocation sourceLoc;
private final PriorityQueue<IPointable> sortedList;
private IPointable item;
private ArrayBackedValueStorage storage;
- public ArraySortFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx, SourceLocation sourceLoc)
+ public ArraySortEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx, SourceLocation sourceLoc)
throws HyracksDataException {
super(args, ctx, inputListType);
this.sourceLoc = sourceLoc;
item = pointableAllocator.allocateEmpty();
storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
- sortedList = new PriorityQueue<>(COMP);
+ sortedList = new PriorityQueue<>(new ArraySortComparator());
}
@Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySymDiffEval.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySymDiffEval.java
index 4ff9c30..45b9fdc 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySymDiffEval.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySymDiffEval.java
@@ -24,6 +24,7 @@
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.asterix.builders.ArrayListFactory;
import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
@@ -33,6 +34,7 @@
import org.apache.asterix.runtime.utils.ArrayFunctionsUtil;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
@@ -44,6 +46,7 @@
private final Int2ObjectMap<List<ValueCounter>> hashes;
private final IObjectPool<List<ValueCounter>, ATypeTag> arrayListAllocator;
private final IObjectPool<ValueCounter, ATypeTag> valueCounterAllocator;
+ private final IBinaryComparator comp;
public ArraySymDiffEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx, SourceLocation sourceLocation,
IAType[] argTypes) throws HyracksDataException {
@@ -51,6 +54,7 @@
arrayListAllocator = new ListObjectPool<>(new ArrayListFactory<>());
valueCounterAllocator = new ListObjectPool<>(new ValueCounterFactory());
hashes = new Int2ObjectOpenHashMap<>();
+ comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
binaryHashFunction = BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(null)
.createBinaryHashFunction();
}
@@ -136,7 +140,7 @@
return true;
} else {
// potentially, item already exists
- ValueCounter itemListIdxCounter = ArrayFunctionsUtil.findItem(item, sameHashes);
+ ValueCounter itemListIdxCounter = ArrayFunctionsUtil.findItem(item, sameHashes, comp);
if (itemListIdxCounter == null) {
// new item
addItem(item, listIndex, sameHashes);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayUnionDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayUnionDescriptor.java
index ccb86c1..0a0a6ef 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayUnionDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayUnionDescriptor.java
@@ -24,6 +24,7 @@
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.asterix.builders.ArrayListFactory;
import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.functions.IFunctionDescriptor;
@@ -41,6 +42,7 @@
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IPointable;
@@ -106,11 +108,13 @@
private final IObjectPool<List<IPointable>, ATypeTag> pointableListAllocator;
private final IBinaryHashFunction binaryHashFunction;
private final Int2ObjectMap<List<IPointable>> hashes;
+ private final IBinaryComparator comp;
public ArrayUnionEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException {
super(args, ctx, true, sourceLoc, argTypes);
pointableListAllocator = new ListObjectPool<>(new ArrayListFactory<>());
hashes = new Int2ObjectOpenHashMap<>();
+ comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
binaryHashFunction = BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(null)
.createBinaryHashFunction();
}
@@ -142,7 +146,7 @@
addItem(listBuilder, item, sameHashes);
hashes.put(hash, sameHashes);
return true;
- } else if (ArrayFunctionsUtil.findItem(item, sameHashes) == null) {
+ } else if (ArrayFunctionsUtil.findItem(item, sameHashes, comp) == null) {
// new item, it could happen that two hashes are the same but they are for different items
addItem(listBuilder, item, sameHashes);
return true;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ArrayFunctionsUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ArrayFunctionsUtil.java
index 37ac692a..14f102c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ArrayFunctionsUtil.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ArrayFunctionsUtil.java
@@ -20,24 +20,21 @@
import java.util.List;
-import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
public class ArrayFunctionsUtil {
- private static final IBinaryComparator COMP = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-
private ArrayFunctionsUtil() {
}
- public static <T extends IValueReference> T findItem(IValueReference item, List<T> sameHashes)
- throws HyracksDataException {
+ public static <T extends IValueReference> T findItem(IValueReference item, List<T> sameHashes,
+ IBinaryComparator comp) throws HyracksDataException {
T sameItem;
for (int k = 0; k < sameHashes.size(); k++) {
sameItem = sameHashes.get(k);
- if (COMP.compare(item.getByteArray(), item.getStartOffset(), item.getLength(), sameItem.getByteArray(),
+ if (comp.compare(item.getByteArray(), item.getStartOffset(), item.getLength(), sameItem.getByteArray(),
sameItem.getStartOffset(), sameItem.getLength()) == 0) {
return sameItem;
}