[ASTERIXDB-3446][RT] cast enhancements
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Avoid allocating pointable objects for all values at once.
Instead, allocate for one value at a time and free object
to make it re-usable for the next value.
- Avoid copying the data from the original input to buffers.
Instead, maintain indexes pointing to the positions of values.
When a value is needed, it will be looked up using the indexes.
Change-Id: Id7945b8d595353f1ea82ba3f1cc87f35ee8055a2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18445
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IARecordBuilder.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IARecordBuilder.java
index ea29643..69e025b 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IARecordBuilder.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IARecordBuilder.java
@@ -59,6 +59,16 @@
public void addField(IValueReference name, IValueReference value) throws HyracksDataException;
/**
+ * @param name
+ * The field name without the tag.
+ * @param value
+ * The field value.
+ * @throws HyracksDataException
+ * if the field name conflicts with a closed field name
+ */
+ public void addNonTaggedFieldName(IValueReference name, IValueReference value) throws HyracksDataException;
+
+ /**
* @param out
* Stream to write data to.
* @param writeTypeTag
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/RecordBuilder.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/RecordBuilder.java
index d08865b..e600201 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/RecordBuilder.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/RecordBuilder.java
@@ -178,7 +178,21 @@
}
@Override
+ public void addNonTaggedFieldName(IValueReference name, IValueReference value) throws HyracksDataException {
+ addField(ATypeTag.SERIALIZED_STRING_TYPE_TAG, name.getStartOffset(), name.getLength(), name.getByteArray(),
+ value);
+ }
+
+ @Override
public void addField(IValueReference name, IValueReference value) throws HyracksDataException {
+ // +1 to move from the tag to the start of the field name value
+ byte[] nameBytes = name.getByteArray();
+ int start = name.getStartOffset();
+ addField(nameBytes[start], start + 1, name.getLength() - 1, nameBytes, value);
+ }
+
+ private void addField(byte nameTag, int nameStart, int nameLength, byte[] nameBytes, IValueReference value)
+ throws HyracksDataException {
byte[] data = value.getByteArray();
int offset = value.getStartOffset();
@@ -186,17 +200,12 @@
if (data[offset] == ATypeTag.SERIALIZED_MISSING_TYPE_TAG) {
return;
}
- byte[] nameBytes = name.getByteArray();
- int nameOffset = name.getStartOffset();
// ignore adding fields with NULL/MISSING names
- if (nameBytes[nameOffset] == ATypeTag.SERIALIZED_MISSING_TYPE_TAG
- || nameBytes[nameOffset] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
+ if (nameTag == ATypeTag.SERIALIZED_MISSING_TYPE_TAG || nameTag == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
// TODO(ali): issue a warning
return;
}
// ignore adding duplicate fields
- int nameStart = nameOffset + 1;
- int nameLength = name.getLength() - 1;
if (recType != null && recTypeInfo.getFieldIndex(nameBytes, nameStart, nameLength) >= 0) {
// TODO(ali): issue a warning
return;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/AFlatValueCastingPointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/AFlatValueCastingPointable.java
new file mode 100644
index 0000000..dffe34e
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/AFlatValueCastingPointable.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables;
+
+import org.apache.asterix.om.pointables.base.ICastingPointable;
+import org.apache.asterix.om.pointables.visitor.ICastingPointableVisitor;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.container.IObjectFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AFlatValueCastingPointable extends AbstractCastingPointable {
+
+ public static final ICastingPointable nullPointable =
+ new AFlatValueCastingPointable(new byte[] { ATypeTag.SERIALIZED_NULL_TYPE_TAG });
+ public static final ICastingPointable missingPointable =
+ new AFlatValueCastingPointable(new byte[] { ATypeTag.SERIALIZED_MISSING_TYPE_TAG });
+ public static final IObjectFactory<AFlatValueCastingPointable, IAType> FACTORY =
+ type -> new AFlatValueCastingPointable();
+
+ private AFlatValueCastingPointable() {
+
+ }
+
+ private AFlatValueCastingPointable(byte[] bytes) {
+ set(bytes, 0, bytes.length);
+ }
+
+ @Override
+ public <R, T> R accept(ICastingPointableVisitor<R, T> visitor, T arg) throws HyracksDataException {
+ return visitor.visit(this, arg);
+ }
+
+ @Override
+ public Type getType() {
+ return Type.FLAT;
+ }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/AListCastingPointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/AListCastingPointable.java
new file mode 100644
index 0000000..8266991
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/AListCastingPointable.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables;
+
+import java.io.DataOutputStream;
+
+import org.apache.asterix.builders.AbstractListBuilder;
+import org.apache.asterix.builders.OrderedListBuilder;
+import org.apache.asterix.builders.UnorderedListBuilder;
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.pointables.base.ICastingPointable;
+import org.apache.asterix.om.pointables.cast.ACastingPointableVisitor;
+import org.apache.asterix.om.pointables.cast.CastResult;
+import org.apache.asterix.om.pointables.visitor.ICastingPointableVisitor;
+import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.asterix.om.util.container.IObjectFactory;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.om.utils.ResettableByteArrayOutputStream;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+
+public class AListCastingPointable extends AbstractCastingPointable {
+
+ public static final IObjectFactory<AListCastingPointable, IAType> FACTORY =
+ type -> new AListCastingPointable((AbstractCollectionType) type);
+ private final ResettableByteArrayOutputStream outBos = new ResettableByteArrayOutputStream();
+ private final DataOutputStream outDos = new DataOutputStream(outBos);
+ private final CastResult itemCastResult = new CastResult(new VoidPointable(), null);
+ private final UnorderedListBuilder unOrderedListBuilder = new UnorderedListBuilder();
+ private final OrderedListBuilder orderedListBuilder = new OrderedListBuilder();
+ private final IAType itemType;
+ private final boolean typedItemList;
+ private final boolean ordered;
+
+ public AListCastingPointable(AbstractCollectionType inputType) {
+ ordered = inputType instanceof AOrderedListType;
+ itemType = TypeComputeUtils.getActualType(inputType.getItemType());
+ typedItemList = itemType.getTypeTag() != ATypeTag.ANY;
+ }
+
+ @Override
+ public <R, T> R accept(ICastingPointableVisitor<R, T> visitor, T arg) throws HyracksDataException {
+ return visitor.visit(this, arg);
+ }
+
+ @Override
+ public Type getType() {
+ return Type.LIST;
+ }
+
+ @Override
+ protected void reset() {
+ super.reset();
+ outBos.reset();
+ }
+
+ public void castList(IPointable castOutResult, AbstractCollectionType reqType, ACastingPointableVisitor visitor)
+ throws HyracksDataException {
+ reset();
+ int tagByte = isTagged() ? 1 : 0;
+ int numberOfItems = AInt32SerializerDeserializer.getInt(data, start + 5 + tagByte);
+ int itemOffset;
+ if (typedItemList && NonTaggedFormatUtil.isFixedSizedCollection(itemType.getTypeTag())) {
+ itemOffset = start + 9 + tagByte;
+ } else {
+ itemOffset = start + 9 + tagByte + (numberOfItems * 4);
+ }
+ IAType reqItemType;
+ AbstractListBuilder listBuilder;
+ if (reqType.getTypeTag() == ATypeTag.MULTISET) {
+ unOrderedListBuilder.reset(reqType);
+ listBuilder = unOrderedListBuilder;
+ reqItemType = reqType.getItemType();
+ } else if (reqType.getTypeTag() == ATypeTag.ARRAY) {
+ orderedListBuilder.reset(reqType);
+ listBuilder = orderedListBuilder;
+ reqItemType = reqType.getItemType();
+ } else {
+ throw new RuntimeException("NYI: " + reqType);
+ }
+ int itemLength;
+ if (typedItemList) {
+ for (int i = 0; i < numberOfItems; i++) {
+ itemLength = NonTaggedFormatUtil.getFieldValueLength(data, itemOffset, itemType.getTypeTag(), false);
+ castItem(itemOffset, itemLength, itemType, itemType.getTypeTag(), reqItemType, visitor, listBuilder,
+ false);
+ itemOffset += itemLength;
+ }
+ } else {
+ for (int i = 0; i < numberOfItems; i++) {
+ ATypeTag itemTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(data[itemOffset]);
+ itemLength = NonTaggedFormatUtil.getFieldValueLength(data, itemOffset, itemTag, true) + 1;
+ IAType itemType = TypeTagUtil.getBuiltinTypeByTag(itemTag);
+ castItem(itemOffset, itemLength, itemType, itemTag, reqItemType, visitor, listBuilder, true);
+ itemOffset += itemLength;
+ }
+ }
+ int outStart = outBos.size();
+ listBuilder.write(outDos, true);
+ int outEnd = outBos.size();
+ castOutResult.set(outBos.getByteArray(), outStart, outEnd - outStart);
+ }
+
+ private void castItem(int itemOffset, int itemLength, IAType itemType, ATypeTag itemTag, IAType reqItemType,
+ ACastingPointableVisitor visitor, AbstractListBuilder listBuilder, boolean openItem)
+ throws HyracksDataException {
+ ICastingPointable item = allocate(data, itemOffset, itemLength, itemType, itemTag, openItem);
+ IAType outType = reqItemType == null ? null : TypeComputeUtils.getActualType(reqItemType);
+ if (outType == null || outType.getTypeTag() == ATypeTag.ANY) {
+ itemCastResult.setOutType(DefaultOpenFieldType.getDefaultOpenFieldType(itemTag));
+ } else {
+ itemCastResult.setOutType(outType);
+ }
+ item.accept(visitor, itemCastResult);
+ listBuilder.addItem(itemCastResult.getOutPointable());
+ free(item);
+ }
+
+ public boolean ordered() {
+ return ordered;
+ }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordCastingPointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordCastingPointable.java
new file mode 100644
index 0000000..7806c1b
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordCastingPointable.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables;
+
+import static org.apache.asterix.om.pointables.AFlatValueCastingPointable.missingPointable;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.pointables.base.ICastingPointable;
+import org.apache.asterix.om.pointables.cast.ACastingPointableVisitor;
+import org.apache.asterix.om.pointables.cast.CastResult;
+import org.apache.asterix.om.pointables.visitor.ICastingPointableVisitor;
+import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.asterix.om.util.container.IObjectFactory;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.asterix.om.utils.ResettableByteArrayOutputStream;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+
+public class ARecordCastingPointable extends AbstractCastingPointable {
+
+ public static final IObjectFactory<ARecordCastingPointable, IAType> FACTORY =
+ type -> new ARecordCastingPointable((ARecordType) type);
+
+ private final CastResult fieldCastResult = new CastResult(new VoidPointable(), null);
+ private final ResettableByteArrayOutputStream inClosedFieldsNames = new ResettableByteArrayOutputStream();
+ private final ResettableByteArrayOutputStream reqFieldNames = new ResettableByteArrayOutputStream();
+ private final DataOutputStream reqFieldNamesDos = new DataOutputStream(reqFieldNames);
+ private final ByteArrayAccessibleOutputStream outBos = new ByteArrayAccessibleOutputStream();
+ private final DataOutputStream outDos = new DataOutputStream(outBos);
+ private final UTF8StringWriter utf8Writer = new UTF8StringWriter();
+ private final RecordBuilder recBuilder = new RecordBuilder();
+ private final IPointable namePointable = new VoidPointable();
+ private final ARecordType inRecType;
+ private ARecordType cachedReqType;
+
+ private final IntArrayList reqFieldsNamesOffset = new IntArrayList();
+ private final IntArrayList inFieldsNamesOffset;
+ private final IntArrayList inFieldsNamesLength;
+ private final IntArrayList inFieldsOffsets;
+ private final IntArrayList inFieldsLength;
+ private final List<ATypeTag> inFieldTags;
+ private final int inSchemaFieldsCount;
+ private int inFieldsCount;
+ private boolean[] inFieldsToOpen;
+ private int[] inFieldNamesSorted;
+ private int[] reqFieldsPositions;
+ private int[] reqFieldNamesSorted;
+ private boolean[] reqOptionalFields;
+
+ public ARecordCastingPointable(ARecordType inputType) {
+ this.inRecType = inputType;
+ IAType[] fieldTypes = inputType.getFieldTypes();
+ String[] fieldNames = inputType.getFieldNames();
+ inSchemaFieldsCount = fieldTypes.length;
+ inFieldsNamesOffset = new IntArrayList(inSchemaFieldsCount);
+ inFieldsNamesLength = new IntArrayList(inSchemaFieldsCount);
+ inFieldsOffsets = new IntArrayList(inSchemaFieldsCount);
+ inFieldsLength = new IntArrayList(inSchemaFieldsCount);
+ inFieldTags = new ArrayList<>(inSchemaFieldsCount);
+ try {
+ DataOutputStream dos = new DataOutputStream(inClosedFieldsNames);
+ for (int i = 0; i < inSchemaFieldsCount; i++) {
+ int nameStart = inClosedFieldsNames.size();
+ inFieldsNamesOffset.add(nameStart);
+ utf8Writer.writeUTF8(fieldNames[i], dos);
+ int nameEnd = inClosedFieldsNames.size();
+ inFieldsNamesLength.add(nameEnd - nameStart);
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public <R, T> R accept(ICastingPointableVisitor<R, T> visitor, T arg) throws HyracksDataException {
+ return visitor.visit(this, arg);
+ }
+
+ @Override
+ public Type getType() {
+ return Type.RECORD;
+ }
+
+ @Override
+ protected void reset() {
+ super.reset();
+ inFieldsNamesOffset.size(inSchemaFieldsCount);
+ inFieldsNamesLength.size(inSchemaFieldsCount);
+ inFieldsOffsets.clear();
+ inFieldsLength.clear();
+ inFieldTags.clear();
+ }
+
+ @Override
+ public void prepare(byte[] b, int start) {
+ reset();
+ prepareRecordInfo(b, start);
+ }
+
+ private void prepareRecordInfo(byte[] b, int start) {
+ boolean isExpanded = false;
+ int openPartOffset = 0;
+ int tagByte = isTagged() ? 1 : 0;
+ int adjust = isTagged() ? 0 : 1;
+ // advance to either isExpanded or numberOfSchemaFields
+ int pointer = start + 4 + tagByte;
+ if (inRecType.isOpen()) {
+ isExpanded = b[pointer] == 1;
+ // advance either to openPartOffset or numberOfSchemaFields
+ pointer += 1;
+ if (isExpanded) {
+ openPartOffset = start + AInt32SerializerDeserializer.getInt(b, pointer) - adjust;
+ // advance to numberOfSchemaFields
+ pointer += 4;
+ }
+ }
+ try {
+ if (inSchemaFieldsCount > 0) {
+ // advance to nullBitMap if hasOptionalFields, or fieldOffsets
+ pointer += 4;
+ prepareClosedFieldsInfo(b, start, adjust, pointer);
+ }
+ int openFieldsCount = 0;
+ if (isExpanded) {
+ openFieldsCount = AInt32SerializerDeserializer.getInt(b, openPartOffset);
+ prepareOpenFieldsInfo(b, openPartOffset, openFieldsCount);
+ }
+ resetForNewInFields(openFieldsCount);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void prepareClosedFieldsInfo(byte[] b, int start, int adjust, int currentPointer)
+ throws HyracksDataException {
+ int offsetArrayPosition;
+ int nullBitMapPosition = 0;
+ boolean hasOptionalFields = NonTaggedFormatUtil.hasOptionalField(inRecType);
+ if (hasOptionalFields) {
+ nullBitMapPosition = currentPointer;
+ offsetArrayPosition = currentPointer
+ + (this.inSchemaFieldsCount % 4 == 0 ? inSchemaFieldsCount / 4 : inSchemaFieldsCount / 4 + 1);
+ } else {
+ offsetArrayPosition = currentPointer;
+ }
+ for (int field = 0; field < inSchemaFieldsCount; field++, offsetArrayPosition += 4) {
+ int fieldOffset = AInt32SerializerDeserializer.getInt(b, offsetArrayPosition) + start - adjust;
+ inFieldsOffsets.add(fieldOffset);
+ if (hasOptionalFields) {
+ byte byteTag = b[nullBitMapPosition + field / 4];
+ if (RecordUtil.isNull(byteTag, field)) {
+ inFieldsLength.add(0);
+ inFieldTags.add(ATypeTag.NULL);
+ continue;
+ }
+ if (RecordUtil.isMissing(byteTag, field)) {
+ inFieldsLength.add(0);
+ inFieldTags.add(ATypeTag.MISSING);
+ continue;
+ }
+ }
+ IAType[] fieldTypes = inRecType.getFieldTypes();
+ int fieldValueLength;
+ ATypeTag fieldTag;
+ if (fieldTypes[field].getTypeTag() == ATypeTag.UNION) {
+ if (((AUnionType) fieldTypes[field]).isUnknownableType()) {
+ IAType fieldType = ((AUnionType) fieldTypes[field]).getActualType();
+ fieldTag = fieldType.getTypeTag();
+ fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(b, fieldOffset, fieldTag, false);
+ } else {
+ throw new IllegalStateException("Got unexpected UNION type " + fieldTypes[field]);
+ }
+ } else {
+ fieldTag = fieldTypes[field].getTypeTag();
+ fieldValueLength = NonTaggedFormatUtil.getFieldValueLength(b, fieldOffset, fieldTag, false);
+ }
+ inFieldsLength.add(fieldValueLength);
+ inFieldTags.add(fieldTag);
+ }
+ }
+
+ private void prepareOpenFieldsInfo(byte[] b, int openPartOffset, int openFieldsCount) throws HyracksDataException {
+ int fieldOffset = openPartOffset + 4 + (8 * openFieldsCount);
+ for (int i = 0; i < openFieldsCount; i++) {
+ inFieldsNamesOffset.add(fieldOffset);
+ int fieldValueLen = NonTaggedFormatUtil.getFieldValueLength(b, fieldOffset, ATypeTag.STRING, false);
+ inFieldsNamesLength.add(fieldValueLen);
+ fieldOffset += fieldValueLen;
+ ATypeTag fieldTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b[fieldOffset]);
+ inFieldTags.add(fieldTag);
+ inFieldsOffsets.add(fieldOffset);
+ // +1 to include the tag in the length since field offset starts at the tag
+ fieldValueLen = NonTaggedFormatUtil.getFieldValueLength(b, fieldOffset, fieldTag, true) + 1;
+ inFieldsLength.add(fieldValueLen);
+ fieldOffset += fieldValueLen;
+ }
+ }
+
+ private void resetForNewInFields(int openFieldsCount) {
+ inFieldsCount = inSchemaFieldsCount + openFieldsCount;
+ if (inFieldsToOpen == null || inFieldsCount > inFieldsToOpen.length) {
+ inFieldsToOpen = new boolean[inFieldsCount];
+ inFieldNamesSorted = new int[inFieldsCount];
+ }
+ for (int i = 0; i < inFieldsCount; i++) {
+ inFieldsToOpen[i] = true;
+ inFieldNamesSorted[i] = i;
+ }
+ }
+
+ public void castRecord(IPointable castOutResult, ARecordType reqType, ACastingPointableVisitor visitor)
+ throws HyracksDataException {
+ ensureRequiredTypeSatisfied(reqType);
+ outBos.reset();
+ writeOutput(outDos, visitor);
+ castOutResult.set(outBos.getByteArray(), 0, outBos.size());
+ }
+
+ private void ensureRequiredTypeSatisfied(ARecordType reqType) throws HyracksDataException {
+ if (!reqType.equals(cachedReqType)) {
+ loadRequiredType(reqType);
+ }
+ Arrays.fill(reqFieldsPositions, -1);
+ ensureClosedPart();
+ }
+
+ private void loadRequiredType(ARecordType reqType) throws HyracksDataException {
+ cachedReqType = reqType;
+ IAType[] fieldTypes = reqType.getFieldTypes();
+ String[] fieldNames = reqType.getFieldNames();
+ int numSchemaFields = fieldTypes.length;
+ reqFieldsNamesOffset.size(numSchemaFields);
+ reqFieldsNamesOffset.clear();
+ reqFieldsPositions = new int[numSchemaFields];
+ reqOptionalFields = new boolean[numSchemaFields];
+ reqFieldNamesSorted = new int[numSchemaFields];
+ reqFieldNames.reset();
+ for (int i = 0; i < numSchemaFields; i++) {
+ int nameStart = reqFieldNames.size(); // TODO use this?
+ String reqFieldName = fieldNames[i];
+ try {
+ utf8Writer.writeUTF8(reqFieldName, reqFieldNamesDos);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ reqFieldsNamesOffset.add(nameStart);
+ if (NonTaggedFormatUtil.isOptional(fieldTypes[i])) {
+ reqOptionalFields[i] = true;
+ }
+ }
+ for (int i = 0; i < reqFieldNamesSorted.length; i++) {
+ reqFieldNamesSorted[i] = i;
+ }
+ // sort the field name index
+ quickSort(reqFieldNamesSorted, this::compareRequiredFieldNames, 0, reqFieldNamesSorted.length - 1);
+ }
+
+ private void ensureClosedPart() throws HyracksDataException {
+ int inFnStart = 0;
+ int reqFnStart = 0;
+ if (inFnStart < inFieldsCount && reqFnStart < reqFieldsNamesOffset.size()) {
+ quickSort(inFieldNamesSorted, this::compareInputFieldNames, 0, inFieldsCount - 1);
+ }
+ while (inFnStart < inFieldsCount && reqFnStart < reqFieldsNamesOffset.size()) {
+ int inFnPos = inFieldNamesSorted[inFnStart];
+ int reqFnPos = reqFieldNamesSorted[reqFnStart];
+ int c = compareInputToRequired(inFnPos, reqFnPos);
+ if (c == 0) {
+ ATypeTag inFieldTag = inFieldTags.get(inFnPos);
+ ATypeTag reqFieldTag = getRequiredFieldTag(reqFnPos);
+ if (inFieldTag == reqFieldTag || acceptsNullMissing(reqFnPos, inFieldTag)
+ || canPromoteDemote(inFieldTag, reqFieldTag)) {
+ reqFieldsPositions[reqFnPos] = inFnPos;
+ inFieldsToOpen[inFnPos] = false;
+ } else {
+ throw new RuntimeDataException(ErrorCode.CASTING_FIELD, inFieldTag, reqFieldTag);
+ }
+ inFnStart++;
+ reqFnStart++;
+ }
+ if (c > 0) {
+ reqFnStart++;
+ }
+ if (c < 0) {
+ inFnStart++;
+ }
+ }
+
+ for (int i = 0; i < inFieldsToOpen.length; i++) {
+ if (inFieldsToOpen[i] && !cachedReqType.isOpen()) {
+ // required type is closed and cannot allow extra fields to be placed in the open section
+ String fieldName = UTF8StringUtil.toString(getInFieldNames(i), inFieldsNamesOffset.getInt(i));
+ ATypeTag fieldTag = inFieldTags.get(i);
+ throw new RuntimeDataException(ErrorCode.TYPE_MISMATCH_EXTRA_FIELD, fieldName + ":" + fieldTag);
+ }
+ }
+
+ for (int i = 0; i < reqFieldsPositions.length; i++) {
+ if (reqFieldsPositions[i] < 0) {
+ IAType t = cachedReqType.getFieldTypes()[i];
+ // if the (required) field is optional, it's ok if it does not exist in the input record
+ if (!NonTaggedFormatUtil.isOptional(t)) {
+ // the field is required and not optional, fail since the input record does not have it
+ throw new RuntimeDataException(ErrorCode.TYPE_MISMATCH_MISSING_FIELD,
+ cachedReqType.getFieldNames()[i], t.getTypeName());
+ }
+ }
+ }
+ }
+
+ private byte[] getInFieldNames(int fieldId) {
+ return fieldId < inSchemaFieldsCount ? inClosedFieldsNames.getByteArray() : data;
+ }
+
+ private ATypeTag getRequiredFieldTag(int fieldId) {
+ return TypeComputeUtils.getActualType(cachedReqType.getFieldTypes()[fieldId]).getTypeTag();
+ }
+
+ private static boolean canPromoteDemote(ATypeTag inFieldTag, ATypeTag reqFieldTag) {
+ return ATypeHierarchy.canPromote(inFieldTag, reqFieldTag) || ATypeHierarchy.canDemote(inFieldTag, reqFieldTag);
+ }
+
+ private boolean acceptsNullMissing(int reqFnPos, ATypeTag inFieldTag) {
+ return reqOptionalFields[reqFnPos] && (inFieldTag == ATypeTag.NULL || inFieldTag == ATypeTag.MISSING);
+ }
+
+ private void writeOutput(DataOutput output, ACastingPointableVisitor visitor) throws HyracksDataException {
+ recBuilder.reset(cachedReqType);
+ recBuilder.init();
+ writeClosedPart(visitor);
+ writeOpenPart(visitor);
+ recBuilder.write(output, true);
+ }
+
+ private void writeClosedPart(ACastingPointableVisitor visitor) throws HyracksDataException {
+ for (int i = 0; i < reqFieldsPositions.length; i++) {
+ int pos = reqFieldsPositions[i];
+ ICastingPointable field = pos >= 0 ? allocateField(pos) : missingPointable;
+ IAType fieldType = cachedReqType.getFieldTypes()[i];
+ fieldCastResult.setOutType(fieldType);
+ if (reqOptionalFields[i]) {
+ ATypeTag fieldTag = pos >= 0 ? inFieldTags.get(pos) : null;
+ if (fieldTag == null || fieldTag == ATypeTag.MISSING) {
+ fieldCastResult.setOutType(BuiltinType.AMISSING);
+ } else if (fieldTag == ATypeTag.NULL) {
+ fieldCastResult.setOutType(BuiltinType.ANULL);
+ } else {
+ fieldCastResult.setOutType(((AUnionType) fieldType).getActualType());
+ }
+ }
+ field.accept(visitor, fieldCastResult);
+ recBuilder.addField(i, fieldCastResult.getOutPointable());
+ free(field);
+ }
+ }
+
+ private void writeOpenPart(ACastingPointableVisitor visitor) throws HyracksDataException {
+ for (int i = 0; i < inFieldsCount; i++) {
+ if (inFieldsToOpen[i]) {
+ ICastingPointable field = allocateField(i);
+ ATypeTag fieldTag = inFieldTags.get(i);
+ fieldCastResult.setOutType(DefaultOpenFieldType.getDefaultOpenFieldType(fieldTag));
+ field.accept(visitor, fieldCastResult);
+ IPointable name = getInFieldName(i);
+ recBuilder.addNonTaggedFieldName(name, fieldCastResult.getOutPointable());
+ free(field);
+ }
+ }
+ }
+
+ private ICastingPointable allocateField(int fieldId) throws HyracksDataException {
+ IAType fieldType;
+ boolean openField;
+ ATypeTag fieldTag = inFieldTags.get(fieldId);
+ if (fieldId < inSchemaFieldsCount) {
+ openField = false;
+ fieldType = TypeComputeUtils.getActualType(inRecType.getFieldTypes()[fieldId]);
+ } else {
+ openField = true;
+ fieldType = TypeTagUtil.getBuiltinTypeByTag(fieldTag);
+ }
+ return allocate(data, inFieldsOffsets.getInt(fieldId), inFieldsLength.getInt(fieldId), fieldType, fieldTag,
+ openField);
+ }
+
+ private IPointable getInFieldName(int fieldId) {
+ if (fieldId < inSchemaFieldsCount) {
+ namePointable.set(inClosedFieldsNames.getByteArray(), inFieldsNamesOffset.getInt(fieldId),
+ inFieldsNamesLength.getInt(fieldId));
+ } else {
+ namePointable.set(data, inFieldsNamesOffset.getInt(fieldId), inFieldsNamesLength.getInt(fieldId));
+ }
+ return namePointable;
+ }
+
+ private void quickSort(int[] index, Comparator comparator, int start, int end) {
+ if (end <= start) {
+ return;
+ }
+ int i = partition(index, comparator, start, end);
+ quickSort(index, comparator, start, i - 1);
+ quickSort(index, comparator, i + 1, end);
+ }
+
+ private int partition(int[] index, Comparator comparator, int left, int right) {
+ int i = left - 1;
+ int j = right;
+ while (true) {
+ while (comparator.compare(index[++i], index[right]) < 0) {
+ ;
+ }
+ while (comparator.compare(index[right], index[--j]) < 0) {
+ if (j == left) {
+ break;
+ }
+ }
+ if (i >= j) {
+ break;
+ }
+ swap(index, i, j);
+ }
+ swap(index, i, right);
+ return i;
+ }
+
+ private void swap(int[] array, int i, int j) {
+ int temp = array[i];
+ array[i] = array[j];
+ array[j] = temp;
+ }
+
+ private int compareInputToRequired(int a, int b) {
+ return UTF8StringUtil.compareTo(getInFieldNames(a), inFieldsNamesOffset.getInt(a), reqFieldNames.getByteArray(),
+ reqFieldsNamesOffset.getInt(b));
+ }
+
+ private int compareInputFieldNames(int i, int j) {
+ return UTF8StringUtil.compareTo(getInFieldNames(i), inFieldsNamesOffset.getInt(i), getInFieldNames(j),
+ inFieldsNamesOffset.getInt(j));
+ }
+
+ private int compareRequiredFieldNames(int i, int j) {
+ return UTF8StringUtil.compareTo(reqFieldNames.getByteArray(), reqFieldsNamesOffset.getInt(i),
+ reqFieldNames.getByteArray(), reqFieldsNamesOffset.getInt(j));
+ }
+
+ @FunctionalInterface
+ private interface Comparator {
+ int compare(int i, int j);
+ }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/AbstractCastingPointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/AbstractCastingPointable.java
new file mode 100644
index 0000000..631ee45
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/AbstractCastingPointable.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables;
+
+import static org.apache.asterix.om.pointables.AFlatValueCastingPointable.missingPointable;
+import static org.apache.asterix.om.pointables.AFlatValueCastingPointable.nullPointable;
+
+import org.apache.asterix.om.pointables.base.ICastingPointable;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnorderedListType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.asterix.om.util.container.IObjectPool;
+import org.apache.asterix.om.util.container.ListObjectPool;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public abstract class AbstractCastingPointable implements ICastingPointable {
+
+ private final IObjectPool<AFlatValueCastingPointable, IAType> flatValuePool =
+ new ListObjectPool<>(AFlatValueCastingPointable.FACTORY);
+ private final IObjectPool<ARecordCastingPointable, IAType> recordValuePool =
+ new ListObjectPool<>(ARecordCastingPointable.FACTORY);
+ private final IObjectPool<AListCastingPointable, IAType> listValuePool =
+ new ListObjectPool<>(AListCastingPointable.FACTORY);
+
+ protected byte[] data;
+ protected int start = -1;
+ protected int len = -1;
+ private boolean tagged;
+ private ATypeTag tag;
+
+ @Override
+ public byte[] getByteArray() {
+ return data;
+ }
+
+ @Override
+ public int getLength() {
+ return len;
+ }
+
+ @Override
+ public int getStartOffset() {
+ return start;
+ }
+
+ @Override
+ public final void set(IValueReference ivf) {
+ set(ivf.getByteArray(), ivf.getStartOffset(), ivf.getLength());
+ }
+
+ @Override
+ public final void set(byte[] bytes, int start, int length) {
+ setInfo(bytes, start, length, EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes[start]), true);
+ }
+
+ @Override
+ public final void setUntagged(byte[] bytes, int start, int length, ATypeTag tag) {
+ setInfo(bytes, start, length, tag, false);
+ }
+
+ private void setInfo(byte[] bytes, int start, int length, ATypeTag tag, boolean tagged) {
+ this.data = bytes;
+ this.start = start;
+ this.len = length;
+ this.tag = tag;
+ this.tagged = tagged;
+ prepare(bytes, start);
+ }
+
+ @Override
+ public final ATypeTag getTag() {
+ return tag;
+ }
+
+ @Override
+ public boolean isTagged() {
+ return tagged;
+ }
+
+ protected void prepare(byte[] bytes, int start) {
+
+ }
+
+ protected ICastingPointable allocate(byte[] data, int valueStart, int valueLen, IAType valueType, ATypeTag valueTag,
+ boolean openValue) throws HyracksDataException {
+ if (valueTag == ATypeTag.NULL) {
+ return nullPointable;
+ }
+ if (valueTag == ATypeTag.MISSING) {
+ return missingPointable;
+ }
+ ICastingPointable pointable;
+ switch (valueTag) {
+ case OBJECT:
+ pointable = recordValuePool.allocate(valueType);
+ break;
+ case ARRAY:
+ case MULTISET:
+ if (openValue) {
+ //TODO(ali): a bit weird where a closed (non-tagged) list has been written in open section
+ AbstractCollectionType listType;
+ int itemTagPosition = valueStart + 1;
+ if (valueTag == ATypeTag.ARRAY) {
+ listType = new AOrderedListType(getItemType(data, itemTagPosition), "");
+ } else {
+ listType = new AUnorderedListType(getItemType(data, itemTagPosition), "");
+ }
+ pointable = listValuePool.allocate(listType);
+ } else {
+ pointable = listValuePool.allocate(valueType);
+ }
+ break;
+ default:
+ pointable = flatValuePool.allocate(null);
+ }
+ if (openValue) {
+ pointable.set(data, valueStart, valueLen);
+ } else {
+ pointable.setUntagged(data, valueStart, valueLen, valueTag);
+ }
+ return pointable;
+ }
+
+ private IAType getItemType(byte[] data, int itemTagPosition) throws HyracksDataException {
+ ATypeTag itemTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(data[itemTagPosition]);
+ if (itemTag == ATypeTag.ARRAY) {
+ return new AOrderedListType(getItemType(data, itemTagPosition + 1), "");
+ } else if (itemTag == ATypeTag.MULTISET) {
+ return new AUnorderedListType(getItemType(data, itemTagPosition + 1), "");
+ } else {
+ return TypeTagUtil.getBuiltinTypeByTag(itemTag);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "{ \"class\" : \"" + getClass().getSimpleName() + "\", \"data\" : "
+ + (data == null ? "null" : ("\"" + System.identityHashCode(data) + ":" + data.length + "\""))
+ + ", \"offset\" : " + start + ", \"length\" : " + len + ", \"tagged\" : " + tagged + ", \"tag\" : \""
+ + tag + "\" }";
+ }
+
+ protected void reset() {
+ flatValuePool.reset();
+ recordValuePool.reset();
+ listValuePool.reset();
+ }
+
+ protected void free(ICastingPointable castingPointable) {
+ if (castingPointable == missingPointable || castingPointable == nullPointable) {
+ return;
+ }
+ switch (castingPointable.getType()) {
+ case FLAT:
+ flatValuePool.free((AFlatValueCastingPointable) castingPointable);
+ break;
+ case LIST:
+ listValuePool.free((AListCastingPointable) castingPointable);
+ break;
+ case RECORD:
+ recordValuePool.free((ARecordCastingPointable) castingPointable);
+ break;
+ }
+ }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/base/ICastingPointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/base/ICastingPointable.java
new file mode 100644
index 0000000..111d86e
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/base/ICastingPointable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables.base;
+
+import org.apache.asterix.om.pointables.visitor.ICastingPointableVisitor;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+
+public interface ICastingPointable extends IPointable {
+
+ enum Type {
+ FLAT,
+ LIST,
+ RECORD,
+ }
+
+ void setUntagged(byte[] bytes, int start, int length, ATypeTag tag);
+
+ boolean isTagged();
+
+ <R, T> R accept(ICastingPointableVisitor<R, T> visitor, T arg) throws HyracksDataException;
+
+ Type getType();
+
+ ATypeTag getTag();
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ACastingPointableVisitor.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ACastingPointableVisitor.java
new file mode 100644
index 0000000..411bda2
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ACastingPointableVisitor.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables.cast;
+
+import java.io.IOException;
+
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.om.pointables.AFlatValueCastingPointable;
+import org.apache.asterix.om.pointables.AListCastingPointable;
+import org.apache.asterix.om.pointables.ARecordCastingPointable;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.pointables.visitor.ICastingPointableVisitor;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+/**
+ * This class is a ICastingPointableVisitor implementation which recursively
+ * visit a given record, list or flat value of a given type, and cast it to a
+ * specified type. For example:
+ * A record { "hobby": {{"music", "coding"}}, "id": "001", "name":
+ * "Person Three"} which confirms to closed type ( id: string, name: string,
+ * hobby: {{string}}? ) can be casted to a open type (id: string )
+ * Since the open/closed part of a record has a completely different underlying
+ * memory/storage layout, the visitor will change the layout as specified at
+ * runtime.
+ */
+public class ACastingPointableVisitor implements ICastingPointableVisitor<Void, CastResult> {
+
+ private final ArrayBackedValueStorage tempBuffer = new ArrayBackedValueStorage();
+ private final ArrayBackedValueStorage castBuffer = new ArrayBackedValueStorage();
+ private final boolean strictDemote;
+ private final SourceLocation sourceLoc;
+
+ public ACastingPointableVisitor(boolean strictDemote, SourceLocation sourceLoc) {
+ this.strictDemote = strictDemote;
+ this.sourceLoc = sourceLoc;
+ }
+
+ public static ACastingPointableVisitor strictCasting(SourceLocation sourceLoc) {
+ return new ACastingPointableVisitor(true, sourceLoc);
+ }
+
+ public static ACastingPointableVisitor laxCasting(SourceLocation sourceLoc) {
+ return new ACastingPointableVisitor(false, sourceLoc);
+ }
+
+ @Override
+ public Void visit(AListCastingPointable list, CastResult castResult) throws HyracksDataException {
+ AbstractCollectionType resultType;
+ switch (castResult.getOutType().getTypeTag()) {
+ case ANY:
+ resultType = list.ordered() ? DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE
+ : DefaultOpenFieldType.NESTED_OPEN_AUNORDERED_LIST_TYPE;
+ break;
+ case ARRAY:
+ case MULTISET:
+ resultType = (AbstractCollectionType) castResult.getOutType();
+ break;
+ default:
+ throw new RuntimeDataException(ErrorCode.TYPE_CONVERT, sourceLoc,
+ list.ordered() ? ATypeTag.ARRAY : ATypeTag.MULTISET, castResult.getOutType().getTypeTag());
+ }
+
+ list.castList(castResult.getOutPointable(), resultType, this);
+ return null;
+ }
+
+ @Override
+ public Void visit(ARecordCastingPointable record, CastResult castResult) throws HyracksDataException {
+ ARecordType resultType;
+ IAType oType = castResult.getOutType();
+ switch (oType.getTypeTag()) {
+ case ANY:
+ resultType = DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE;
+ break;
+ case OBJECT:
+ resultType = (ARecordType) oType;
+ break;
+ default:
+ throw new RuntimeDataException(ErrorCode.TYPE_CONVERT, sourceLoc, ATypeTag.OBJECT, oType.getTypeTag());
+ }
+
+ record.castRecord(castResult.getOutPointable(), resultType, this);
+ return null;
+ }
+
+ @Override
+ public Void visit(AFlatValueCastingPointable flatValue, CastResult castResult) throws HyracksDataException {
+ IValueReference val;
+ if (flatValue.isTagged()) {
+ val = flatValue;
+ } else {
+ tempBuffer.reset();
+ try {
+ tempBuffer.getDataOutput().writeByte(flatValue.getTag().serialize());
+ tempBuffer.getDataOutput().write(flatValue.getByteArray(), flatValue.getStartOffset(),
+ flatValue.getLength());
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ val = tempBuffer;
+ }
+ IAType oType = castResult.getOutType();
+ IPointable outPointable = castResult.getOutPointable();
+ ATypeTag reqTypeTag;
+ if (oType == null || (reqTypeTag = oType.getTypeTag()) == ATypeTag.ANY) {
+ // for open type case
+ outPointable.set(val);
+ return null;
+ }
+ byte[] valueBytes = val.getByteArray();
+ int valueOffset = val.getStartOffset();
+ ATypeTag inputTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(valueBytes[valueOffset]);
+ if (needPromoteDemote(inputTypeTag, reqTypeTag)) {
+ try {
+ castBuffer.reset();
+ ATypeHierarchy.convertNumericTypeByteArray(valueBytes, valueOffset, val.getLength(), reqTypeTag,
+ castBuffer.getDataOutput(), strictDemote);
+ outPointable.set(castBuffer);
+ } catch (HyracksDataException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new RuntimeDataException(ErrorCode.TYPE_CONVERT, sourceLoc, inputTypeTag, reqTypeTag);
+ }
+
+ } else {
+ outPointable.set(val);
+ }
+ return null;
+ }
+
+ private boolean needPromoteDemote(ATypeTag inTag, ATypeTag outTag) {
+ return inTag != outTag && inTag != ATypeTag.NULL && inTag != ATypeTag.MISSING;
+ }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/visitor/ICastingPointableVisitor.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/visitor/ICastingPointableVisitor.java
new file mode 100644
index 0000000..ed65b06
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/visitor/ICastingPointableVisitor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables.visitor;
+
+import org.apache.asterix.om.pointables.AFlatValueCastingPointable;
+import org.apache.asterix.om.pointables.AListCastingPointable;
+import org.apache.asterix.om.pointables.ARecordCastingPointable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface ICastingPointableVisitor<R, T> {
+
+ R visit(AListCastingPointable list, T arg) throws HyracksDataException;
+
+ R visit(ARecordCastingPointable record, T arg) throws HyracksDataException;
+
+ R visit(AFlatValueCastingPointable flatValue, T arg) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java
index eb5275a..50f6417 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java
@@ -19,14 +19,19 @@
package org.apache.asterix.runtime.evaluators.functions;
-import org.apache.asterix.om.pointables.PointableAllocator;
-import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
-import org.apache.asterix.om.pointables.base.IVisitablePointable;
-import org.apache.asterix.om.pointables.cast.ACastVisitor;
+import org.apache.asterix.om.pointables.AFlatValueCastingPointable;
+import org.apache.asterix.om.pointables.AListCastingPointable;
+import org.apache.asterix.om.pointables.ARecordCastingPointable;
+import org.apache.asterix.om.pointables.base.ICastingPointable;
+import org.apache.asterix.om.pointables.cast.ACastingPointableVisitor;
import org.apache.asterix.om.pointables.cast.CastResult;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.asterix.om.util.container.IObjectPool;
+import org.apache.asterix.om.util.container.ListObjectPool;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
@@ -36,13 +41,19 @@
public class CastTypeEvaluator implements IScalarEvaluator {
+ private final IPointable argPointable = new VoidPointable();
+ private final IObjectPool<AFlatValueCastingPointable, IAType> flatValuePool =
+ new ListObjectPool<>(AFlatValueCastingPointable.FACTORY);
+ private final IObjectPool<ARecordCastingPointable, IAType> recordValuePool =
+ new ListObjectPool<>(ARecordCastingPointable.FACTORY);
+ private final IObjectPool<AListCastingPointable, IAType> listValuePool =
+ new ListObjectPool<>(AListCastingPointable.FACTORY);
private IScalarEvaluator argEvaluator;
protected final SourceLocation sourceLoc;
- private final IPointable argPointable = new VoidPointable();
- private final PointableAllocator allocator = new PointableAllocator();
- private IVisitablePointable inputPointable;
- private final ACastVisitor castVisitor = createCastVisitor();
+ private ICastingPointable inputPointable;
+ private final ACastingPointableVisitor castVisitor = createCastVisitor();
private final CastResult castResult = new CastResult(new VoidPointable(), null);
+ private boolean inputTypeIsAny;
public CastTypeEvaluator(SourceLocation sourceLoc) {
this.sourceLoc = sourceLoc;
@@ -57,12 +68,12 @@
public void resetAndAllocate(IAType reqType, IAType inputType, IScalarEvaluator argEvaluator) {
this.argEvaluator = argEvaluator;
- this.inputPointable = allocatePointable(inputType, reqType);
+ this.inputPointable = allocatePointable(inputType);
this.castResult.setOutType(reqType);
}
- protected ACastVisitor createCastVisitor() {
- return new ACastVisitor(sourceLoc);
+ protected ACastingPointableVisitor createCastVisitor() {
+ return ACastingPointableVisitor.strictCasting(sourceLoc);
}
@Override
@@ -78,6 +89,11 @@
// TODO(ali): refactor in a better way
protected void cast(IPointable argPointable, IPointable result) throws HyracksDataException {
+ if (inputTypeIsAny) {
+ ATypeTag inTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(argPointable.getByteArray()[argPointable.getStartOffset()]);
+ inputPointable = allocateForInput(TypeTagUtil.getBuiltinTypeByTag(inTag));
+ }
inputPointable.set(argPointable);
castInto(result);
}
@@ -87,30 +103,35 @@
result.set(castResult.getOutPointable());
}
- // Allocates the result pointable.
- private IVisitablePointable allocatePointable(IAType typeForPointable, IAType typeForOtherSide) {
- if (!typeForPointable.equals(BuiltinType.ANY)) {
- return allocator.allocateFieldValue(typeForPointable);
+ private ICastingPointable allocatePointable(IAType inputType) {
+ if (!inputType.equals(BuiltinType.ANY)) {
+ inputTypeIsAny = false;
+ return allocateForInput(inputType);
+ } else {
+ inputTypeIsAny = true;
+ return null;
}
- return allocatePointableForAny(typeForOtherSide);
}
- // Allocates an input or result pointable if the input or required type is ANY.
- private IVisitablePointable allocatePointableForAny(IAType type) {
- ATypeTag tag = type.getTypeTag();
- switch (tag) {
+ private ICastingPointable allocateForInput(IAType inputType) {
+ ICastingPointable pointable;
+ switch (inputType.getTypeTag()) {
case OBJECT:
- return allocator.allocateFieldValue(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+ pointable = recordValuePool.allocate(inputType);
+ break;
case ARRAY:
- return allocator.allocateFieldValue(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE);
case MULTISET:
- return allocator.allocateFieldValue(DefaultOpenFieldType.NESTED_OPEN_AUNORDERED_LIST_TYPE);
+ pointable = listValuePool.allocate(inputType);
+ break;
default:
- return allocator.allocateFieldValue(null);
+ pointable = flatValuePool.allocate(null);
}
+ return pointable;
}
public void deallocatePointables() {
- allocator.reset();
+ flatValuePool.reset();
+ listValuePool.reset();
+ recordValuePool.reset();
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeLaxEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeLaxEvaluator.java
index 56035a5..e25e446 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeLaxEvaluator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeLaxEvaluator.java
@@ -19,7 +19,7 @@
package org.apache.asterix.runtime.evaluators.functions;
-import org.apache.asterix.om.pointables.cast.ACastVisitor;
+import org.apache.asterix.om.pointables.cast.ACastingPointableVisitor;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
@@ -41,8 +41,8 @@
}
@Override
- protected ACastVisitor createCastVisitor() {
- return new ACastVisitor(false, sourceLoc);
+ protected ACastingPointableVisitor createCastVisitor() {
+ return ACastingPointableVisitor.laxCasting(sourceLoc);
}
@Override