[ASTERIXDB-2516][COMP] Avoid writing field names & values when comparing records
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
ARecordVisitablePointable writes field names, field tags, and field values
first before giving access to the record information. This is not ideal for
comparison. A different record accessor is needed for comparison. Also, the
field names should be sorted which ARecordVisitablePointable does not provide.
- avoid this writing when a pointable to the name & value can be obtained
(especially when the field value already includes the tag)
- use UTF8Pointable cached values (string length, meta length) to compare
instead of using the string comparator which would recalculate these values
- refactored some common code.
- removed not used methods in ARecordPointable
Change-Id: I19ac95a91749b2983bf06f763e463521a97a261c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3280
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/CompareHashUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/CompareHashUtil.java
deleted file mode 100644
index a42e5aa..0000000
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/CompareHashUtil.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.dataflow.data.nontagged;
-
-import static org.apache.asterix.om.types.ATypeTag.SERIALIZED_MISSING_TYPE_TAG;
-import static org.apache.asterix.om.types.ATypeTag.VALUE_TYPE_MAPPING;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.PriorityQueue;
-
-import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
-import org.apache.asterix.om.pointables.base.IVisitablePointable;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.TypeTagUtil;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class CompareHashUtil {
-
- private CompareHashUtil() {
- }
-
- public static Comparator<IVisitablePointable> createFieldNamesComp(IBinaryComparator stringComp) {
- return new Comparator<IVisitablePointable>() {
- @Override
- public int compare(IVisitablePointable name1, IVisitablePointable name2) {
- try {
- return stringComp.compare(name1.getByteArray(), name1.getStartOffset() + 1, name1.getLength() - 1,
- name2.getByteArray(), name2.getStartOffset() + 1, name2.getLength() - 1);
- } catch (HyracksDataException e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- public static int addToHeap(List<IVisitablePointable> recordFNames, List<IVisitablePointable> recordFValues,
- PriorityQueue<IVisitablePointable> names) {
- // do not add fields whose value is missing, they don't exist in reality
- int length = recordFNames.size();
- IVisitablePointable fieldValue;
- int count = 0;
- for (int i = 0; i < length; i++) {
- fieldValue = recordFValues.get(i);
- if (fieldValue.getByteArray()[fieldValue.getStartOffset()] != SERIALIZED_MISSING_TYPE_TAG) {
- names.add(recordFNames.get(i));
- count++;
- }
- }
- return count;
- }
-
- public static int getIndex(List<IVisitablePointable> names, IVisitablePointable instance) {
- int size = names.size();
- for (int i = 0; i < size; i++) {
- if (instance == names.get(i)) {
- return i;
- }
- }
- throw new IllegalStateException();
- }
-
- public static IAType getType(ARecordType recordType, int fieldIdx, ATypeTag fieldTag) throws HyracksDataException {
- IAType[] fieldTypes = recordType.getFieldTypes();
- if (fieldIdx >= fieldTypes.length) {
- return fieldTag.isDerivedType() ? DefaultOpenFieldType.getDefaultOpenFieldType(fieldTag)
- : TypeTagUtil.getBuiltinTypeByTag(fieldTag);
- }
- return fieldTypes[fieldIdx];
- }
-}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AbstractAGenericBinaryComparator.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AbstractAGenericBinaryComparator.java
index 098c81f..6d7ad76 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AbstractAGenericBinaryComparator.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AbstractAGenericBinaryComparator.java
@@ -18,23 +18,20 @@
*/
package org.apache.asterix.dataflow.data.nontagged.comparators;
-import static org.apache.asterix.om.types.ATypeTag.VALUE_TYPE_MAPPING;
+import static org.apache.asterix.om.util.container.ObjectFactories.RECORD_FACTORY;
+import static org.apache.asterix.om.util.container.ObjectFactories.STORAGE_FACTORY;
+import static org.apache.asterix.om.util.container.ObjectFactories.VOID_FACTORY;
import java.io.IOException;
-import java.util.Comparator;
-import java.util.List;
-import java.util.PriorityQueue;
import org.apache.asterix.dataflow.data.common.ListAccessorUtil;
-import org.apache.asterix.dataflow.data.nontagged.CompareHashUtil;
import org.apache.asterix.dataflow.data.nontagged.serde.ADateSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.ADayTimeDurationSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.ATimeSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AYearMonthDurationSerializerDeserializer;
-import org.apache.asterix.om.pointables.ARecordVisitablePointable;
-import org.apache.asterix.om.pointables.PointableAllocator;
-import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.pointables.nonvisitor.RecordField;
+import org.apache.asterix.om.pointables.nonvisitor.SortedRecord;
import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
@@ -43,9 +40,7 @@
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
import org.apache.asterix.om.types.hierachy.ATypeHierarchy.Domain;
-import org.apache.asterix.om.util.container.IObjectPool;
import org.apache.asterix.om.util.container.ListObjectPool;
-import org.apache.asterix.om.util.container.ObjectFactories;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
@@ -61,64 +56,43 @@
*/
abstract class AbstractAGenericBinaryComparator implements IBinaryComparator {
- // BOOLEAN
private final IBinaryComparator ascBoolComp = BooleanBinaryComparatorFactory.INSTANCE.createBinaryComparator();
- // STRING
private final IBinaryComparator ascStrComp =
new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY).createBinaryComparator();
- // BINARY
private final IBinaryComparator ascByteArrayComp =
new PointableBinaryComparatorFactory(ByteArrayPointable.FACTORY).createBinaryComparator();
- // RECTANGLE
private final IBinaryComparator ascRectangleComp =
ARectanglePartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
- // CIRCLE
private final IBinaryComparator ascCircleComp =
ACirclePartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
- // DURATION
private final IBinaryComparator ascDurationComp =
ADurationPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
- // INTERVAL
private final IBinaryComparator ascIntervalComp =
AIntervalAscPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
- // LINE
private final IBinaryComparator ascLineComp = ALinePartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
- // POINT
private final IBinaryComparator ascPointComp =
APointPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
- // POINT3D
private final IBinaryComparator ascPoint3DComp =
APoint3DPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
- // POLYGON
private final IBinaryComparator ascPolygonComp =
APolygonPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
- // UUID
private final IBinaryComparator ascUUIDComp = AUUIDPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
- // RAW
private final IBinaryComparator rawComp = RawBinaryComparatorFactory.INSTANCE.createBinaryComparator();
- // these fields can be null
+ // the type fields can be null
protected final IAType leftType;
protected final IAType rightType;
- private final IObjectPool<IMutableValueStorage, Void> storageAllocator;
- private final IObjectPool<IPointable, Void> voidPointableAllocator;
- // used for record comparison, sorting field names
- private final PointableAllocator recordAllocator;
- private final IObjectPool<PriorityQueue<IVisitablePointable>, Void> heapAllocator;
- private final Comparator<IVisitablePointable> fieldNamesComparator;
+ private final ListObjectPool<IMutableValueStorage, Void> storageAllocator = new ListObjectPool<>(STORAGE_FACTORY);
+ private final ListObjectPool<IPointable, Void> voidPointableAllocator = new ListObjectPool<>(VOID_FACTORY);
+ private final ListObjectPool<SortedRecord, ARecordType> recordPool = new ListObjectPool<>(RECORD_FACTORY);
AbstractAGenericBinaryComparator(IAType leftType, IAType rightType) {
// factory should have already made sure to get the actual type
this.leftType = leftType;
this.rightType = rightType;
- this.storageAllocator = new ListObjectPool<>(ObjectFactories.STORAGE_FACTORY);
- this.voidPointableAllocator = new ListObjectPool<>(ObjectFactories.VOID_FACTORY);
- this.recordAllocator = new PointableAllocator();
- this.fieldNamesComparator = CompareHashUtil.createFieldNamesComp(ascStrComp);
- this.heapAllocator = new ListObjectPool<>((type) -> new PriorityQueue<>(fieldNamesComparator));
}
- protected int compare(IAType leftType, byte[] b1, int s1, int l1, IAType rightType, byte[] b2, int s2, int l2)
+ protected final int compare(IAType leftType, byte[] b1, int s1, int l1, IAType rightType, byte[] b2, int s2, int l2)
throws HyracksDataException {
if (b1[s1] == ATypeTag.SERIALIZED_MISSING_TYPE_TAG) {
return b2[s2] == ATypeTag.SERIALIZED_MISSING_TYPE_TAG ? 0 : -1;
@@ -132,16 +106,14 @@
}
ATypeTag tag1 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b1[s1]);
ATypeTag tag2 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b2[s2]);
- // if one of tag is null, that means we are dealing with an empty byte array in one side.
- // and, we don't need to continue. We just compare raw byte by byte.
+ // tag being null could mean several things among of which is that the passed args are not tagged
if (tag1 == null || tag2 == null) {
return rawComp.compare(b1, s1, l1, b2, s2, l2);
}
if (ATypeHierarchy.isCompatible(tag1, tag2) && ATypeHierarchy.getTypeDomain(tag1) == Domain.NUMERIC) {
return ComparatorUtil.compareNumbers(tag1, b1, s1 + 1, tag2, b2, s2 + 1);
}
- // currently only numbers are compatible. if two tags are not compatible, we compare the tags.
- // this is especially useful when we need to generate some order between any two types.
+ // currently only numbers are compatible. if two tags are not compatible, we compare the tags to generate order
if (tag1 != tag2) {
return Byte.compare(b1[s1], b2[s2]);
}
@@ -191,7 +163,6 @@
case OBJECT:
return compareRecords(leftType, b1, s1, l1, rightType, b2, s2, l2);
default:
- // we include typeTag in comparison to compare between two type to enforce some ordering
return rawComp.compare(b1, s1, l1, b2, s2, l2);
}
}
@@ -248,53 +219,34 @@
}
ARecordType leftRecordType = (ARecordType) TypeComputeUtils.getActualTypeOrOpen(leftType, ATypeTag.OBJECT);
ARecordType rightRecordType = (ARecordType) TypeComputeUtils.getActualTypeOrOpen(rightType, ATypeTag.OBJECT);
- ARecordVisitablePointable leftRecord = recordAllocator.allocateRecordValue(leftRecordType);
- ARecordVisitablePointable rightRecord = recordAllocator.allocateRecordValue(rightRecordType);
- PriorityQueue<IVisitablePointable> leftNamesHeap = null, rightNamesHeap = null;
+ SortedRecord leftRecord = recordPool.allocate(leftRecordType);
+ SortedRecord rightRecord = recordPool.allocate(rightRecordType);
+ IPointable leftFieldValue = voidPointableAllocator.allocate(null);
+ IPointable rightFieldValue = voidPointableAllocator.allocate(null);
+ // TODO(ali): this is not ideal. should be removed when tagged pointables are introduced
+ ArrayBackedValueStorage leftStorage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
+ ArrayBackedValueStorage rightStorage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
try {
- leftRecord.set(b1, s1, l1);
- rightRecord.set(b2, s2, l2);
- List<IVisitablePointable> leftFieldsNames = leftRecord.getFieldNames();
- List<IVisitablePointable> rightFieldsNames = rightRecord.getFieldNames();
- List<IVisitablePointable> leftFieldsValues = leftRecord.getFieldValues();
- List<IVisitablePointable> rightFieldsValues = rightRecord.getFieldValues();
- leftNamesHeap = heapAllocator.allocate(null);
- rightNamesHeap = heapAllocator.allocate(null);
- leftNamesHeap.clear();
- rightNamesHeap.clear();
- int numLeftValuedFields = CompareHashUtil.addToHeap(leftFieldsNames, leftFieldsValues, leftNamesHeap);
- int numRightValuedFields = CompareHashUtil.addToHeap(rightFieldsNames, rightFieldsValues, rightNamesHeap);
- if (numLeftValuedFields == 0 && numRightValuedFields == 0) {
- return 0;
- } else if (numLeftValuedFields == 0) {
- return -1;
- } else if (numRightValuedFields == 0) {
- return 1;
- }
- int result;
- int leftFieldIdx, rightFieldIdx;
+ leftRecord.reset(b1, s1);
+ rightRecord.reset(b2, s2);
IAType leftFieldType, rightFieldType;
- IVisitablePointable leftFieldName, leftFieldValue, rightFieldName, rightFieldValue;
- ATypeTag fieldTag;
- while (!leftNamesHeap.isEmpty() && !rightNamesHeap.isEmpty()) {
- leftFieldName = leftNamesHeap.poll();
- rightFieldName = rightNamesHeap.poll();
+ RecordField leftField, rightField;
+ int result;
+ while (!leftRecord.isEmpty() && !rightRecord.isEmpty()) {
+ leftField = leftRecord.poll();
+ rightField = rightRecord.poll();
// compare the names first
- result = ascStrComp.compare(leftFieldName.getByteArray(), leftFieldName.getStartOffset() + 1,
- leftFieldName.getLength() - 1, rightFieldName.getByteArray(),
- rightFieldName.getStartOffset() + 1, rightFieldName.getLength() - 1);
+ result = RecordField.FIELD_NAME_COMP.compare(leftField, rightField);
if (result != 0) {
return result;
}
// then compare the values if the names are equal
- leftFieldIdx = CompareHashUtil.getIndex(leftFieldsNames, leftFieldName);
- rightFieldIdx = CompareHashUtil.getIndex(rightFieldsNames, rightFieldName);
- leftFieldValue = leftFieldsValues.get(leftFieldIdx);
- rightFieldValue = rightFieldsValues.get(rightFieldIdx);
- fieldTag = VALUE_TYPE_MAPPING[leftFieldValue.getByteArray()[leftFieldValue.getStartOffset()]];
- leftFieldType = CompareHashUtil.getType(leftRecordType, leftFieldIdx, fieldTag);
- fieldTag = VALUE_TYPE_MAPPING[rightFieldValue.getByteArray()[rightFieldValue.getStartOffset()]];
- rightFieldType = CompareHashUtil.getType(rightRecordType, rightFieldIdx, fieldTag);
+ leftStorage.reset();
+ rightStorage.reset();
+ leftRecord.getFieldValue(leftField, leftFieldValue, leftStorage);
+ rightRecord.getFieldValue(rightField, rightFieldValue, rightStorage);
+ leftFieldType = leftRecord.getFieldType(leftField);
+ rightFieldType = rightRecord.getFieldType(rightField);
result = compare(leftFieldType, leftFieldValue.getByteArray(), leftFieldValue.getStartOffset(),
leftFieldValue.getLength(), rightFieldType, rightFieldValue.getByteArray(),
rightFieldValue.getStartOffset(), rightFieldValue.getLength());
@@ -302,17 +254,16 @@
return result;
}
}
-
- return Integer.compare(numLeftValuedFields, numRightValuedFields);
+ return Integer.compare(leftRecord.size(), rightRecord.size());
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
} finally {
- recordAllocator.freeRecord(rightRecord);
- recordAllocator.freeRecord(leftRecord);
- if (rightNamesHeap != null) {
- heapAllocator.free(rightNamesHeap);
- }
- if (leftNamesHeap != null) {
- heapAllocator.free(leftNamesHeap);
- }
+ recordPool.free(rightRecord);
+ recordPool.free(leftRecord);
+ voidPointableAllocator.free(rightFieldValue);
+ voidPointableAllocator.free(leftFieldValue);
+ storageAllocator.free(rightStorage);
+ storageAllocator.free(leftStorage);
}
}
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/LogicalComplexBinaryComparator.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/LogicalComplexBinaryComparator.java
index b83e248..aa902bb 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/LogicalComplexBinaryComparator.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/LogicalComplexBinaryComparator.java
@@ -30,7 +30,6 @@
import org.apache.asterix.dataflow.data.common.ILogicalBinaryComparator;
import org.apache.asterix.dataflow.data.common.ListAccessorUtil;
-import org.apache.asterix.dataflow.data.nontagged.CompareHashUtil;
import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
import org.apache.asterix.om.base.IAObject;
import org.apache.asterix.om.pointables.ARecordVisitablePointable;
@@ -44,6 +43,7 @@
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.util.container.IObjectPool;
import org.apache.asterix.om.util.container.ListObjectPool;
+import org.apache.asterix.om.utils.RecordUtil;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
@@ -275,8 +275,8 @@
if (leftFTag == ATypeTag.NULL || rightFTag == ATypeTag.NULL) {
tempCompResult = Result.NULL;
} else if (leftFTag.isDerivedType() && rightFTag.isDerivedType()) {
- leftFieldType = CompareHashUtil.getType(leftRecordType, i, leftFTag);
- rightFieldType = CompareHashUtil.getType(rightRecordType, k, rightFTag);
+ leftFieldType = RecordUtil.getType(leftRecordType, i, leftFTag);
+ rightFieldType = RecordUtil.getType(rightRecordType, k, rightFTag);
tempCompResult = compareComplex(leftFieldType, leftFTag, leftFieldValue,
rightFieldType, rightFTag, rightFieldValue);
} else {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/hash/AMurmurHash3BinaryHashFunctionFamily.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/hash/AMurmurHash3BinaryHashFunctionFamily.java
index 2ef6e80..020fbd1 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/hash/AMurmurHash3BinaryHashFunctionFamily.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/hash/AMurmurHash3BinaryHashFunctionFamily.java
@@ -18,19 +18,16 @@
*/
package org.apache.asterix.dataflow.data.nontagged.hash;
-import static org.apache.asterix.om.types.ATypeTag.VALUE_TYPE_MAPPING;
+import static org.apache.asterix.om.util.container.ObjectFactories.RECORD_FACTORY;
+import static org.apache.asterix.om.util.container.ObjectFactories.STORAGE_FACTORY;
+import static org.apache.asterix.om.util.container.ObjectFactories.VOID_FACTORY;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Comparator;
-import java.util.List;
-import java.util.PriorityQueue;
import org.apache.asterix.dataflow.data.common.ListAccessorUtil;
-import org.apache.asterix.dataflow.data.nontagged.CompareHashUtil;
-import org.apache.asterix.om.pointables.ARecordVisitablePointable;
-import org.apache.asterix.om.pointables.PointableAllocator;
-import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.pointables.nonvisitor.RecordField;
+import org.apache.asterix.om.pointables.nonvisitor.SortedRecord;
import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
@@ -41,16 +38,13 @@
import org.apache.asterix.om.types.hierachy.IntegerToDoubleTypeConvertComputer;
import org.apache.asterix.om.util.container.IObjectPool;
import org.apache.asterix.om.util.container.ListObjectPool;
-import org.apache.asterix.om.util.container.ObjectFactories;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.accessors.MurmurHash3BinaryHash;
-import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
public class AMurmurHash3BinaryHashFunctionFamily implements IBinaryHashFunctionFamily {
@@ -83,24 +77,15 @@
private final ArrayBackedValueStorage valueBuffer = new ArrayBackedValueStorage();
private final DataOutput valueOut = valueBuffer.getDataOutput();
- private final IObjectPool<IPointable, Void> voidPointableAllocator;
- private final IObjectPool<IMutableValueStorage, Void> storageAllocator;
+ private final IObjectPool<IPointable, Void> voidPointableAllocator = new ListObjectPool<>(VOID_FACTORY);
+ private final IObjectPool<IMutableValueStorage, Void> storageAllocator = new ListObjectPool<>(STORAGE_FACTORY);
+ private final IObjectPool<SortedRecord, ARecordType> recordPool = new ListObjectPool<>(RECORD_FACTORY);
private final IAType type;
private final int seed;
- // used for record hashing, sorting field names first
- private final PointableAllocator recordAllocator;
- private final IObjectPool<PriorityQueue<IVisitablePointable>, Void> heapAllocator;
- private final Comparator<IVisitablePointable> fieldNamesComparator;
private GenericHashFunction(IAType type, int seed) {
this.type = type;
this.seed = seed;
- this.voidPointableAllocator = new ListObjectPool<>(ObjectFactories.VOID_FACTORY);
- this.storageAllocator = new ListObjectPool<>(ObjectFactories.STORAGE_FACTORY);
- this.recordAllocator = new PointableAllocator();
- this.fieldNamesComparator = CompareHashUtil.createFieldNamesComp(
- new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY).createBinaryComparator());
- this.heapAllocator = new ListObjectPool<>((arg) -> new PriorityQueue<>(fieldNamesComparator));
}
@Override
@@ -178,35 +163,28 @@
return MurmurHash3BinaryHash.hash(bytes, offset, length, seed);
}
ARecordType recordType = (ARecordType) TypeComputeUtils.getActualTypeOrOpen(type, ATypeTag.OBJECT);
- ARecordVisitablePointable record = recordAllocator.allocateRecordValue(recordType);
- PriorityQueue<IVisitablePointable> namesHeap = heapAllocator.allocate(null);
+ SortedRecord record = recordPool.allocate(recordType);
+ IPointable fieldValue = voidPointableAllocator.allocate(null);
+ // TODO(ali): this is not ideal. should be removed when tagged pointables are introduced
+ ArrayBackedValueStorage storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
try {
- record.set(bytes, offset, length);
- namesHeap.clear();
- List<IVisitablePointable> fieldsNames = record.getFieldNames();
- List<IVisitablePointable> fieldsValues = record.getFieldValues();
- CompareHashUtil.addToHeap(fieldsNames, fieldsValues, namesHeap);
- IVisitablePointable fieldName, fieldValue;
- IAType fieldType;
- ATypeTag fieldTag;
+ record.reset(bytes, offset);
int hash = seed;
- int fieldIdx;
- while (!namesHeap.isEmpty()) {
- fieldName = namesHeap.poll();
- // TODO(ali): currently doing another lookup to find the target field index and get its value & type
- fieldIdx = CompareHashUtil.getIndex(fieldsNames, fieldName);
- fieldValue = fieldsValues.get(fieldIdx);
- fieldTag = VALUE_TYPE_MAPPING[fieldValue.getByteArray()[fieldValue.getStartOffset()]];
- fieldType = CompareHashUtil.getType(recordType, fieldIdx, fieldTag);
- hash ^= MurmurHash3BinaryHash.hash(fieldName.getByteArray(), fieldName.getStartOffset(),
- fieldName.getLength(), seed)
- ^ hash(fieldType, fieldValue.getByteArray(), fieldValue.getStartOffset(),
- fieldValue.getLength());
+ while (!record.isEmpty()) {
+ RecordField field = record.poll();
+ storage.reset();
+ record.getFieldValue(field, fieldValue, storage);
+ IAType fieldType = record.getFieldType(field);
+ hash ^= field.getName().hash() ^ hash(fieldType, fieldValue.getByteArray(),
+ fieldValue.getStartOffset(), fieldValue.getLength());
}
return hash;
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
} finally {
- recordAllocator.freeRecord(record);
- heapAllocator.free(namesHeap);
+ recordPool.free(record);
+ voidPointableAllocator.free(fieldValue);
+ storageAllocator.free(storage);
}
}
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
index 7089fd6..193ff7d 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
@@ -34,6 +34,7 @@
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.util.container.IObjectFactory;
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.om.utils.RecordUtil;
import org.apache.asterix.om.utils.ResettableByteArrayOutputStream;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.util.string.UTF8StringWriter;
@@ -197,14 +198,12 @@
for (int fieldNumber = 0; fieldNumber < numberOfSchemaFields; fieldNumber++) {
if (hasOptionalFields) {
byte b1 = b[nullBitMapOffset + fieldNumber / 4];
- int p = 1 << (7 - 2 * (fieldNumber % 4));
- if ((b1 & p) == 0) {
+ if (RecordUtil.isNull(b1, fieldNumber)) {
// set null value (including type tag inside)
fieldValues.add(nullReference);
continue;
}
- p = 1 << (7 - 2 * (fieldNumber % 4) - 1);
- if ((b1 & p) == 0) {
+ if (RecordUtil.isMissing(b1, fieldNumber)) {
// set missing value (including type tag inside)
fieldValues.add(missingReference);
continue;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
index 4ffbf47..8820ce2 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
@@ -46,16 +46,16 @@
import com.fasterxml.jackson.databind.JsonNode;
-/*
+/**
* This class interprets the binary data representation of a record.
*
* Record {
* byte tag;
* int length;
- * byte isExpanded;
+ * byte isExpanded?;
* int openOffset?;
* int numberOfClosedFields;
- * byte[ceil (numberOfFields / 4)] nullBitMap; // 1 bit per field, "1" means field is Null for this record
+ * byte[ceil (numberOfFields / 4)] nullBitMap; // 2 bits per field, "1" means field is null, "2" field is missing
* int[numberOfClosedFields] closedFieldOffset;
* IPointable[numberOfClosedFields] fieldValue;
* int numberOfOpenFields?;
@@ -107,20 +107,14 @@
}
- public static final IObjectFactory<IPointable, ATypeTag> ALLOCATOR = new IObjectFactory<IPointable, ATypeTag>() {
- @Override
- public IPointable create(ATypeTag type) {
- return new ARecordPointable();
- }
- };
-
- private static final int TAG_SIZE = 1;
- private static final int RECORD_LENGTH_SIZE = 4;
- private static final int EXPANDED_SIZE = 1;
- private static final int OPEN_OFFSET_SIZE = 4;
- private static final int CLOSED_COUNT_SIZE = 4;
- private static final int FIELD_OFFSET_SIZE = 4;
- private static final int OPEN_COUNT_SIZE = 4;
+ public static final IObjectFactory<IPointable, ATypeTag> ALLOCATOR = type -> new ARecordPointable();
+ static final int TAG_SIZE = 1;
+ static final int RECORD_LENGTH_SIZE = 4;
+ static final int EXPANDED_SIZE = 1;
+ static final int OPEN_OFFSET_SIZE = 4;
+ static final int CLOSED_COUNT_SIZE = 4;
+ static final int FIELD_OFFSET_SIZE = 4;
+ static final int OPEN_COUNT_SIZE = 4;
private static final int OPEN_FIELD_HASH_SIZE = 4;
private static final int OPEN_FIELD_OFFSET_SIZE = 4;
private static final int OPEN_FIELD_HEADER = OPEN_FIELD_HASH_SIZE + OPEN_FIELD_OFFSET_SIZE;
@@ -129,97 +123,46 @@
return recordType == null || recordType.isOpen();
}
- public int getSchemeFieldCount(ARecordType recordType) {
+ public final int getSchemeFieldCount(ARecordType recordType) {
return recordType.getFieldNames().length;
}
- public byte getTag() {
- return BytePointable.getByte(bytes, getTagOffset());
- }
-
- public int getTagOffset() {
- return start;
- }
-
- public int getTagSize() {
- return TAG_SIZE;
- }
-
@Override
public int getLength() {
- return IntegerPointable.getInteger(bytes, getLengthOffset());
+ return IntegerPointable.getInteger(bytes, start + TAG_SIZE);
}
- public int getLengthOffset() {
- return getTagOffset() + getTagSize();
+ private boolean isExpanded(ARecordType recordType) {
+ return isOpen(recordType) && BooleanPointable.getBoolean(bytes, start + TAG_SIZE + RECORD_LENGTH_SIZE);
}
- public int getLengthSize() {
- return RECORD_LENGTH_SIZE;
+ private int getOpenPartOffset(ARecordType recordType) {
+ return start + TAG_SIZE + RECORD_LENGTH_SIZE + (isOpen(recordType) ? EXPANDED_SIZE : 0);
}
- public boolean isExpanded(ARecordType recordType) {
- if (isOpen(recordType)) {
- return BooleanPointable.getBoolean(bytes, getExpandedOffset(recordType));
- }
- return false;
+ private int getNullBitmapOffset(ARecordType recordType) {
+ return getOpenPartOffset(recordType) + (isExpanded(recordType) ? OPEN_OFFSET_SIZE : 0) + CLOSED_COUNT_SIZE;
}
- public int getExpandedOffset(ARecordType recordType) {
- return getLengthOffset() + getLengthSize();
- }
-
- public int getExpandedSize(ARecordType recordType) {
- return isOpen(recordType) ? EXPANDED_SIZE : 0;
- }
-
- public int getOpenPartOffset(ARecordType recordType) {
- return getExpandedOffset(recordType) + getExpandedSize(recordType);
- }
-
- public int getOpenPartSize(ARecordType recordType) {
- return isExpanded(recordType) ? OPEN_OFFSET_SIZE : 0;
- }
-
- public int getClosedFieldCount(ARecordType recordType) {
- return IntegerPointable.getInteger(bytes, getClosedFieldCountOffset(recordType));
- }
-
- public int getClosedFieldCountOffset(ARecordType recordType) {
- return getOpenPartOffset(recordType) + getOpenPartSize(recordType);
- }
-
- public int getClosedFieldCountSize(ARecordType recordType) {
- return CLOSED_COUNT_SIZE;
- }
-
- public int getNullBitmapOffset(ARecordType recordType) {
- return getClosedFieldCountOffset(recordType) + getClosedFieldCountSize(recordType);
- }
-
- public int getNullBitmapSize(ARecordType recordType) {
+ private int getNullBitmapSize(ARecordType recordType) {
return RecordUtil.computeNullBitmapSize(recordType);
}
public boolean isClosedFieldNull(ARecordType recordType, int fieldId) {
- if (getNullBitmapSize(recordType) > 0) {
- return (bytes[getNullBitmapOffset(recordType) + fieldId / 4] & (1 << (7 - 2 * (fieldId % 4)))) == 0;
- }
- return false;
+ return getNullBitmapSize(recordType) > 0
+ && RecordUtil.isNull(bytes[getNullBitmapOffset(recordType) + fieldId / 4], fieldId);
}
- public boolean isClosedFieldMissing(ARecordType recordType, int fieldId) {
- if (getNullBitmapSize(recordType) > 0) {
- return (bytes[getNullBitmapOffset(recordType) + fieldId / 4] & (1 << (7 - 2 * (fieldId % 4) - 1))) == 0;
- }
- return false;
+ private boolean isClosedFieldMissing(ARecordType recordType, int fieldId) {
+ return getNullBitmapSize(recordType) > 0
+ && RecordUtil.isMissing(bytes[getNullBitmapOffset(recordType) + fieldId / 4], fieldId);
}
// -----------------------
// Closed field accessors.
// -----------------------
- public void getClosedFieldValue(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
+ public final void getClosedFieldValue(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
if (isClosedFieldNull(recordType, fieldId)) {
dOut.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
} else if (isClosedFieldMissing(recordType, fieldId)) {
@@ -238,27 +181,28 @@
* @param pointable
* @throws IOException
*/
- public void getClosedFieldValue(ARecordType recordType, int fieldId, IPointable pointable) throws IOException {
+ public final void getClosedFieldValue(ARecordType recordType, int fieldId, IPointable pointable)
+ throws IOException {
if (isClosedFieldNull(recordType, fieldId) || isClosedFieldMissing(recordType, fieldId)) {
throw new IllegalStateException("Can't read a null or missing field");
}
pointable.set(bytes, getClosedFieldOffset(recordType, fieldId), getClosedFieldSize(recordType, fieldId));
}
- public String getClosedFieldName(ARecordType recordType, int fieldId) {
+ private String getClosedFieldName(ARecordType recordType, int fieldId) {
return recordType.getFieldNames()[fieldId];
}
- public void getClosedFieldName(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
+ public final void getClosedFieldName(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
dOut.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
utf8Writer.writeUTF8(getClosedFieldName(recordType, fieldId), dOut);
}
- public byte getClosedFieldTag(ARecordType recordType, int fieldId) {
+ public final byte getClosedFieldTag(ARecordType recordType, int fieldId) {
return getClosedFieldType(recordType, fieldId).getTypeTag().serialize();
}
- public IAType getClosedFieldType(ARecordType recordType, int fieldId) {
+ public final IAType getClosedFieldType(ARecordType recordType, int fieldId) {
IAType aType = recordType.getFieldTypes()[fieldId];
if (NonTaggedFormatUtil.isOptional(aType)) {
// optional field: add the embedded non-null type tag
@@ -267,7 +211,7 @@
return aType;
}
- public int getClosedFieldSize(ARecordType recordType, int fieldId) throws HyracksDataException {
+ public final int getClosedFieldSize(ARecordType recordType, int fieldId) throws HyracksDataException {
if (isClosedFieldNull(recordType, fieldId)) {
return 0;
}
@@ -275,7 +219,7 @@
getClosedFieldType(recordType, fieldId).getTypeTag(), false);
}
- public int getClosedFieldOffset(ARecordType recordType, int fieldId) {
+ public final int getClosedFieldOffset(ARecordType recordType, int fieldId) {
int offset = getNullBitmapOffset(recordType) + getNullBitmapSize(recordType) + fieldId * FIELD_OFFSET_SIZE;
return start + IntegerPointable.getInteger(bytes, offset);
}
@@ -284,15 +228,15 @@
// Open field count.
// -----------------------
- public int getOpenFieldCount(ARecordType recordType) {
+ public final int getOpenFieldCount(ARecordType recordType) {
return isExpanded(recordType) ? IntegerPointable.getInteger(bytes, getOpenFieldCountOffset(recordType)) : 0;
}
- public int getOpenFieldCountSize(ARecordType recordType) {
+ private int getOpenFieldCountSize(ARecordType recordType) {
return isExpanded(recordType) ? OPEN_COUNT_SIZE : 0;
}
- public int getOpenFieldCountOffset(ARecordType recordType) {
+ private int getOpenFieldCountOffset(ARecordType recordType) {
return start + IntegerPointable.getInteger(bytes, getOpenPartOffset(recordType));
}
@@ -300,67 +244,53 @@
// Open field accessors.
// -----------------------
- public void getOpenFieldValue(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
+ public final void getOpenFieldValue(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
dOut.write(bytes, getOpenFieldValueOffset(recordType, fieldId), getOpenFieldValueSize(recordType, fieldId));
}
- public int getOpenFieldValueOffset(ARecordType recordType, int fieldId) {
+ public final int getOpenFieldValueOffset(ARecordType recordType, int fieldId) {
return getOpenFieldNameOffset(recordType, fieldId) + getOpenFieldNameSize(recordType, fieldId);
}
- public int getOpenFieldValueSize(ARecordType recordType, int fieldId) throws HyracksDataException {
+ public final int getOpenFieldValueSize(ARecordType recordType, int fieldId) throws HyracksDataException {
int offset = getOpenFieldValueOffset(recordType, fieldId);
ATypeTag tag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(getOpenFieldTag(recordType, fieldId));
return NonTaggedFormatUtil.getFieldValueLength(bytes, offset, tag, true);
}
- public void getOpenFieldName(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
+ public final void getOpenFieldName(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
dOut.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
dOut.write(bytes, getOpenFieldNameOffset(recordType, fieldId), getOpenFieldNameSize(recordType, fieldId));
}
- public String getOpenFieldName(ARecordType recordType, int fieldId) throws IOException {
+ public final String getOpenFieldName(ARecordType recordType, int fieldId) {
StringBuilder str = new StringBuilder();
int offset = getOpenFieldNameOffset(recordType, fieldId);
- UTF8StringUtil.toString(str, bytes, offset);
- String fieldName = str.toString();
- return fieldName;
+ return UTF8StringUtil.toString(str, bytes, offset).toString();
}
- public int getOpenFieldNameSize(ARecordType recordType, int fieldId) {
+ private int getOpenFieldNameSize(ARecordType recordType, int fieldId) {
int utfleng = UTF8StringUtil.getUTFLength(bytes, getOpenFieldNameOffset(recordType, fieldId));
return utfleng + UTF8StringUtil.getNumBytesToStoreLength(utfleng);
}
- public int getOpenFieldNameOffset(ARecordType recordType, int fieldId) {
+ private int getOpenFieldNameOffset(ARecordType recordType, int fieldId) {
return getOpenFieldOffset(recordType, fieldId);
}
- public byte getOpenFieldTag(ARecordType recordType, int fieldId) {
+ public final byte getOpenFieldTag(ARecordType recordType, int fieldId) {
return bytes[getOpenFieldValueOffset(recordType, fieldId)];
}
- public int getOpenFieldHash(ARecordType recordType, int fieldId) {
- return IntegerPointable.getInteger(bytes, getOpenFieldHashOffset(recordType, fieldId));
- }
-
- public int getOpenFieldHashOffset(ARecordType recordType, int fieldId) {
+ private int getOpenFieldHashOffset(ARecordType recordType, int fieldId) {
return getOpenFieldCountOffset(recordType) + getOpenFieldCountSize(recordType) + fieldId * OPEN_FIELD_HEADER;
}
- public int getOpenFieldHashSize(ARecordType recordType, int fieldId) {
- return OPEN_FIELD_HASH_SIZE;
- }
-
- public int getOpenFieldOffset(ARecordType recordType, int fieldId) {
+ private int getOpenFieldOffset(ARecordType recordType, int fieldId) {
return start + IntegerPointable.getInteger(bytes, getOpenFieldOffsetOffset(recordType, fieldId));
}
- public int getOpenFieldOffsetOffset(ARecordType recordType, int fieldId) {
- return getOpenFieldHashOffset(recordType, fieldId) + getOpenFieldHashSize(recordType, fieldId);
- }
-
- public int getOpenFieldOffsetSize(ARecordType recordType, int fieldId) {
- return OPEN_FIELD_HASH_SIZE;
+ private int getOpenFieldOffsetOffset(ARecordType recordType, int fieldId) {
+ return getOpenFieldHashOffset(recordType, fieldId) + OPEN_FIELD_HASH_SIZE;
}
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/RecordField.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/RecordField.java
new file mode 100644
index 0000000..0cd16dc
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/RecordField.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.om.pointables.nonvisitor;
+
+import java.util.Comparator;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+
+/**
+ * This class can be used to hold information about a field in a record. The {@link #index} variable is the position of
+ * the field in the record as laid out in the bytes of the record. The value pointed to by the {@link #valueOffset}
+ * could be tagged or non-tagged.
+ */
+public final class RecordField {
+
+ public static final Comparator<RecordField> FIELD_NAME_COMP =
+ (field1, field2) -> UTF8StringPointable.compare(field1.namePointable, field2.namePointable);
+ private UTF8StringPointable namePointable;
+ private ATypeTag valueTag;
+ private int index;
+ private int valueOffset;
+ private int valueLength;
+
+ RecordField() {
+ }
+
+ // for open field
+ final void set(UTF8StringPointable namePointable, int index, int valueOffset, int valueLength, ATypeTag valueTag) {
+ this.namePointable = namePointable;
+ set(index, valueOffset, valueLength, valueTag);
+ }
+
+ // for closed field where the name is already set
+ final void set(int index, int valueOffset, int valueLength, ATypeTag valueTag) {
+ this.index = index;
+ this.valueOffset = valueOffset;
+ this.valueTag = valueTag;
+ this.valueLength = valueLength;
+ }
+
+ public final UTF8StringPointable getName() {
+ return namePointable;
+ }
+
+ final int getIndex() {
+ return index;
+ }
+
+ final int getValueOffset() {
+ return valueOffset;
+ }
+
+ final int getValueLength() {
+ return valueLength;
+ }
+
+ final ATypeTag getValueTag() {
+ return valueTag;
+ }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/SortedRecord.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/SortedRecord.java
new file mode 100644
index 0000000..1d9ac6c
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/SortedRecord.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.om.pointables.nonvisitor;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.container.IObjectFactory;
+import org.apache.asterix.om.util.container.IObjectPool;
+import org.apache.asterix.om.util.container.ListObjectPool;
+import org.apache.asterix.om.util.container.ObjectFactories;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+/**
+ * This class is used to access the fields of a record in lexicographical order of the field names. The fields can be
+ * retrieved by calling {@link #poll()} which will poll the field with the minimum field name. The field value can be
+ * retrieved lazily by using the information stored in the {@link RecordField} returned from polling. Before
+ * accessing the record information, {@link #reset(byte[], int)} has to be called first to point to the bytes of the
+ * record in question.
+ */
+public final class SortedRecord {
+
+ private static final IObjectFactory<RecordField, Void> FIELD_FACTORY = type -> new RecordField();
+ // TODO(ali): copied from PointableHelper for now.
+ private static final byte[] NULL_BYTES = new byte[] { ATypeTag.SERIALIZED_NULL_TYPE_TAG };
+ private static final byte[] MISSING_BYTES = new byte[] { ATypeTag.SERIALIZED_MISSING_TYPE_TAG };
+ private final IObjectPool<UTF8StringPointable, Void> utf8Pool = new ListObjectPool<>(ObjectFactories.UTF8_FACTORY);
+ private final IObjectPool<RecordField, Void> recordFieldPool = new ListObjectPool<>(FIELD_FACTORY);
+ private final PriorityQueue<RecordField> sortedFields = new PriorityQueue<>(RecordField.FIELD_NAME_COMP);
+ private final ARecordType recordType;
+ private final IAType[] fieldTypes;
+ private final RecordField[] closedFields;
+ private final int numSchemaFields;
+ private final boolean hasOptionalFields;
+ private final int nullBitMapSize;
+ private byte[] bytes;
+
+ public SortedRecord(ARecordType recordType) {
+ String[] fieldNames = recordType.getFieldNames();
+ this.numSchemaFields = fieldNames.length;
+ this.recordType = recordType;
+ this.fieldTypes = recordType.getFieldTypes();
+ this.hasOptionalFields = NonTaggedFormatUtil.hasOptionalField(recordType);
+ this.nullBitMapSize = RecordUtil.computeNullBitmapSize(hasOptionalFields, recordType);
+ this.closedFields = new RecordField[numSchemaFields];
+ UTF8StringWriter utf8Writer = new UTF8StringWriter();
+ ByteArrayAccessibleOutputStream byteArrayStream = new ByteArrayAccessibleOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayStream);
+ try {
+ // write each closed field into a pointable, create a new field and add to the list of closedFields
+ for (int i = 0; i < numSchemaFields; i++) {
+ int nameStart = dataOutputStream.size();
+ utf8Writer.writeUTF8(fieldNames[i], dataOutputStream);
+ int nameEnd = dataOutputStream.size();
+ UTF8StringPointable utf8Pointable = UTF8StringPointable.FACTORY.createPointable();
+ utf8Pointable.set(byteArrayStream.getByteArray(), nameStart, nameEnd - nameStart);
+ RecordField field = new RecordField();
+ field.set(utf8Pointable, -1, -1, -1, null);
+ closedFields[i] = field;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * This method should be called before using the record. It recalculates logical indices and offsets of fields,
+ * closed and open. It populates the utf8 filed names for open.
+ */
+ public final void reset(byte[] data, int start) throws HyracksDataException {
+ bytes = data;
+ reset();
+ boolean isExpanded = false;
+ // advance to expanded byte if present
+ int cursor = start + ARecordPointable.TAG_SIZE + ARecordPointable.RECORD_LENGTH_SIZE;
+ if (recordType.isOpen()) {
+ isExpanded = bytes[cursor] == 1;
+ // advance to either open part offset or number of closed fields
+ cursor += ARecordPointable.EXPANDED_SIZE;
+ }
+ int openPartOffset = 0;
+ if (isExpanded) {
+ openPartOffset = start + AInt32SerializerDeserializer.getInt(bytes, cursor);
+ // advance to number of closed fields
+ cursor += ARecordPointable.OPEN_OFFSET_SIZE;
+ }
+ int fieldOffset;
+ int length;
+ int fieldIndex = 0;
+ ATypeTag tag;
+ // advance to where fields offsets are (or to null bit map if the schema has optional fields)
+ cursor += ARecordPointable.CLOSED_COUNT_SIZE;
+ int nullBitMapOffset = cursor;
+ int fieldsOffsets = cursor + nullBitMapSize;
+ // compute the offsets of each closed field value and whether it's missing or null
+ for (int i = 0; i < numSchemaFields; i++, fieldIndex++) {
+ fieldOffset = AInt32SerializerDeserializer.getInt(bytes, fieldsOffsets) + start;
+ tag = TypeComputeUtils.getActualType(fieldTypes[i]).getTypeTag();
+ if (hasOptionalFields) {
+ byte nullBits = bytes[nullBitMapOffset + i / 4];
+ if (RecordUtil.isNull(nullBits, i)) {
+ tag = ATypeTag.NULL;
+ } else if (RecordUtil.isMissing(nullBits, i)) {
+ tag = ATypeTag.MISSING;
+ }
+ }
+ length = NonTaggedFormatUtil.getFieldValueLength(bytes, fieldOffset, tag, false);
+ closedFields[i].set(fieldIndex, fieldOffset, length, tag);
+ if (tag != ATypeTag.MISSING) {
+ sortedFields.add(closedFields[i]);
+ }
+ fieldsOffsets += ARecordPointable.FIELD_OFFSET_SIZE;
+ }
+ // then populate open fields info second, an open field has name + value (tagged)
+ if (isExpanded) {
+ int numberOpenFields = AInt32SerializerDeserializer.getInt(bytes, openPartOffset);
+ fieldOffset = openPartOffset + ARecordPointable.OPEN_COUNT_SIZE + (8 * numberOpenFields);
+ for (int i = 0; i < numberOpenFields; i++, fieldIndex++) {
+ // get a pointable to the field name
+ length = NonTaggedFormatUtil.getFieldValueLength(bytes, fieldOffset, ATypeTag.STRING, false);
+ UTF8StringPointable openFieldName = utf8Pool.allocate(null);
+ openFieldName.set(bytes, fieldOffset, length);
+ // move to the value
+ fieldOffset += length;
+ tag = ATypeTag.VALUE_TYPE_MAPPING[bytes[fieldOffset]];
+ // +1 to account for the tag included since the field is open
+ length = NonTaggedFormatUtil.getFieldValueLength(bytes, fieldOffset, tag, true) + 1;
+ RecordField openField = recordFieldPool.allocate(null);
+ openField.set(openFieldName, fieldIndex, fieldOffset, length, tag);
+ sortedFields.add(openField);
+ // then skip the value to the next field name
+ fieldOffset += length;
+ }
+ }
+ }
+
+ private void reset() {
+ sortedFields.clear();
+ utf8Pool.reset();
+ recordFieldPool.reset();
+ }
+
+ public final boolean isEmpty() {
+ return sortedFields.isEmpty();
+ }
+
+ public final RecordField poll() {
+ return sortedFields.poll();
+ }
+
+ public final int size() {
+ return sortedFields.size();
+ }
+
+ public final IAType getFieldType(RecordField field) throws HyracksDataException {
+ return RecordUtil.getType(recordType, field.getIndex(), field.getValueTag());
+ }
+
+ public final void getFieldValue(RecordField field, IPointable pointable, ArrayBackedValueStorage storage)
+ throws IOException {
+ int fieldIdx = field.getIndex();
+ if (fieldIdx >= numSchemaFields) {
+ // open field
+ pointable.set(bytes, field.getValueOffset(), field.getValueLength());
+ } else {
+ // closed field
+ if (field.getValueTag() == ATypeTag.MISSING) {
+ pointable.set(MISSING_BYTES, 0, MISSING_BYTES.length);
+ } else if (field.getValueTag() == ATypeTag.NULL) {
+ pointable.set(NULL_BYTES, 0, NULL_BYTES.length);
+ } else {
+ // TODO(ali): this is not ideal. should not need to copy the tag when tagged pointables are introduced
+ int start = storage.getLength();
+ storage.getDataOutput().writeByte(field.getValueTag().serialize());
+ storage.getDataOutput().write(bytes, field.getValueOffset(), field.getValueLength());
+ int end = storage.getLength();
+ pointable.set(storage.getByteArray(), start, end - start);
+ }
+ }
+ }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java
index 941f59f..53ae805 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java
@@ -122,7 +122,7 @@
VALUE_TYPE_MAPPING = typeList.toArray(new ATypeTag[typeList.size()]);
}
- ATypeTag(int value) {
+ private ATypeTag(int value) {
this.value = (byte) value;
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java
index e4167ec..5d97125 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java
@@ -85,7 +85,6 @@
case OBJECT:
return RecordUtil.FULLY_OPEN_RECORD_TYPE;
case MULTISET:
- // TODO: how come the item type in this instance is "null"
return AUnorderedListType.FULLY_OPEN_UNORDEREDLIST_TYPE;
case ARRAY:
return AOrderedListType.FULL_OPEN_ORDEREDLIST_TYPE;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ListObjectPool.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ListObjectPool.java
index bdc1943..78c411a 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ListObjectPool.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ListObjectPool.java
@@ -24,10 +24,8 @@
import java.util.List;
/**
- * Object pool backed by a list.
- * The argument for creating E instances could be different. This class also
- * considers arguments in object reusing, e.g., it reuses an E instances ONLY
- * when the construction argument is "equal".
+ * Object pool backed by a list. The argument for creating E instances could be different. This class also considers
+ * arguments in object reusing, e.g., it reuses an E instances ONLY when the construction argument is "equal".
*/
public class ListObjectPool<E, T> implements IObjectPool<E, T> {
@@ -36,12 +34,12 @@
/**
* cache of objects
*/
- private List<E> pool = new ArrayList<E>();
+ private List<E> pool = new ArrayList<>();
/**
* args that are used to create each element in the pool
*/
- private List<T> args = new ArrayList<T>();
+ private List<T> args = new ArrayList<>();
/**
* bits indicating which element is in use
@@ -85,7 +83,7 @@
@Override
public boolean free(E element) {
- for (int i = pool.size() - 1; i >= 0; i--) {
+ for (int i = usedBits.length(); (i = usedBits.previousSetBit(i - 1)) >= 0;) {
if (element == pool.get(i)) {
usedBits.clear(i);
return true;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ObjectFactories.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ObjectFactories.java
index 763e1d4..2c6e408 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ObjectFactories.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ObjectFactories.java
@@ -20,8 +20,11 @@
import java.util.BitSet;
+import org.apache.asterix.om.pointables.nonvisitor.SortedRecord;
+import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -36,8 +39,10 @@
private ObjectFactories() {
}
- public static final IObjectFactory<IPointable, Void> VOID_FACTORY = (type) -> new VoidPointable();
+ public static final IObjectFactory<IPointable, Void> VOID_FACTORY = type -> new VoidPointable();
public static final IObjectFactory<IMutableValueStorage, Void> STORAGE_FACTORY =
- (type) -> new ArrayBackedValueStorage();
- public static final IObjectFactory<BitSet, Void> BIT_SET_FACTORY = (type) -> new BitSet();
+ type -> new ArrayBackedValueStorage();
+ public static final IObjectFactory<BitSet, Void> BIT_SET_FACTORY = type -> new BitSet();
+ public static final IObjectFactory<UTF8StringPointable, Void> UTF8_FACTORY = type -> new UTF8StringPointable();
+ public static final IObjectFactory<SortedRecord, ARecordType> RECORD_FACTORY = SortedRecord::new;
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java
index ec1c7cb..cdbe8d3 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java
@@ -20,9 +20,13 @@
import java.util.List;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class RecordUtil {
/**
@@ -35,44 +39,52 @@
}
/**
- * Create a fully open record type with the passed name
- *
- * @param name
- * @return
- */
- public static ARecordType createOpenRecordType(String name) {
- return new ARecordType(name, new String[0], new IAType[0], true);
- }
-
- /**
- * A util method that takes a field name and return a String representation for error messages
- *
- * @param field
- * @return
+ * @param field a field name represented by a list
+ * @return string version of the field
*/
public static String toFullyQualifiedName(List<String> field) {
return StringUtils.join(field, ".");
}
/**
- * A util method that takes String array and return a String representation for error messages
- *
- * @param field
- * @return
+ * @param names a hierarchy of entity names
+ * @return string version of the entity name to be qualified
*/
public static String toFullyQualifiedName(String... names) {
return StringUtils.join(names, ".");
}
/**
- * compute the null Bitmap size for the open fields
+ * Computes the null Bitmap size when the schema has optional fields (nullable/missable)
*
- * @param recordType
- * the record type
- * @return the size of the bitmap
+ * @param recordType the record type
+ * @return the size of the bitmap in number of bytes
*/
public static int computeNullBitmapSize(ARecordType recordType) {
- return NonTaggedFormatUtil.hasOptionalField(recordType)
- ? (int) Math.ceil(recordType.getFieldNames().length / 4.0) : 0;
+ return computeNullBitmapSize(NonTaggedFormatUtil.hasOptionalField(recordType), recordType);
+ }
+
+ public static int computeNullBitmapSize(boolean hasOptionalField, ARecordType recordType) {
+ // each field needs 2 bits for MISSING, NULL, and VALUE. for 4 fields, 4*2=8 bits (1 byte), thus divide by 4.
+ return hasOptionalField ? (int) Math.ceil(recordType.getFieldTypes().length / 4.0) : 0;
+ }
+
+ public static boolean isNull(byte nullMissingBits, int fieldIndex) {
+ int position = 1 << (7 - 2 * (fieldIndex % 4));
+ return (nullMissingBits & position) == 0;
+ }
+
+ public static boolean isMissing(byte nullMissingBits, int fieldIndex) {
+ int position = 1 << (7 - 2 * (fieldIndex % 4) - 1);
+ return (nullMissingBits & position) == 0;
+ }
+
+ public static IAType getType(ARecordType recordType, int fieldIdx, ATypeTag fieldTag) throws HyracksDataException {
+ IAType[] fieldTypes = recordType.getFieldTypes();
+ if (fieldIdx >= fieldTypes.length) {
+ return fieldTag.isDerivedType() ? DefaultOpenFieldType.getDefaultOpenFieldType(fieldTag)
+ : TypeTagUtil.getBuiltinTypeByTag(fieldTag);
+ }
+ return fieldTypes[fieldIdx];
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
index f683615..4137581 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
@@ -154,6 +154,14 @@
return UTF8StringUtil.compareTo(this.bytes, this.start, bytes, start);
}
+ // TODO(ali): could use the normalized key, too.
+ // takes advantage of cached utf8 length and meta length
+ public static int compare(UTF8StringPointable pointable1, UTF8StringPointable pointable2) {
+ return UTF8StringUtil.compareTo(pointable1.bytes, pointable1.start + pointable1.metaLength,
+ pointable1.utf8Length, pointable2.bytes, pointable2.start + pointable2.metaLength,
+ pointable2.utf8Length);
+ }
+
@Override
public int hash() {
if (hashValue == 0) {
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
index 3b3e7b4..9aa2b5d 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
@@ -241,6 +241,12 @@
return compareTo(thisBytes, thisStart, thatBytes, thatStart, false, false);
}
+ // the start and length of each are the ones calculated by UTF8StringPointable. caller should provide proper values
+ public static int compareTo(byte[] thisBytes, int thisStart, int thisLength, byte[] thatBytes, int thatStart,
+ int thatLength) {
+ return compareTo(thisBytes, thisStart, thisLength, thatBytes, thatStart, thatLength, false, false);
+ }
+
/**
* This function provides the raw bytes-based comparison for UTF8 strings.
* Note that the comparison may not deliver the correct ordering for certain languages that include 2 or 3 bytes characters.