[ASTERIXDB-3132][OTH] Add schema structure
- user mode changes: no
- storage format changes: no
- interface changes: no
Details:
Add schema structure, which allows evolvement (or changes)
Change-Id: I15b469ba64b3f4d561aaaff442fc92f71270a1d8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17417
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableMetadata.java
new file mode 100644
index 0000000..c7b4651
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableMetadata.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public abstract class AbstractColumnImmutableMetadata extends AbstractColumnMetadata {
+ protected final IValueReference serializedMetadata;
+ protected final int numberOfColumns;
+
+ protected AbstractColumnImmutableMetadata(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys,
+ IValueReference serializedMetadata, int numberOfColumns) {
+ super(datasetType, metaType, numberOfPrimaryKeys);
+ this.serializedMetadata = serializedMetadata;
+ this.numberOfColumns = numberOfColumns;
+ }
+
+ @Override
+ public final IValueReference serializeColumnsMetadata() {
+ return serializedMetadata;
+ }
+
+ @Override
+ public final void abort() throws HyracksDataException {
+ //NoOp as the metadata is immutable
+ }
+
+ @Override
+ public int getNumberOfColumns() {
+ return numberOfColumns;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
new file mode 100644
index 0000000..5ac38d7
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+
+public abstract class AbstractColumnImmutableReadMetadata extends AbstractColumnImmutableMetadata
+ implements IColumnProjectionInfo {
+ protected AbstractColumnImmutableReadMetadata(ARecordType datasetType, ARecordType metaType,
+ int numberOfPrimaryKeys, IValueReference serializedMetadata, int numberOfColumns) {
+ super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata, numberOfColumns);
+ }
+
+ /**
+ * @return the corresponding reader (merge reader or query reader) given <code>this</code> metadata
+ */
+ public abstract AbstractColumnTupleReader createTupleReader();
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnMetadata.java
new file mode 100644
index 0000000..4e19cbc
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnMetadata.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
+
+public abstract class AbstractColumnMetadata implements IColumnMetadata {
+ protected static final int WRITERS_POINTER = 0;
+ protected static final int FIELD_NAMES_POINTER = WRITERS_POINTER + Integer.BYTES;
+ protected static final int SCHEMA_POINTER = FIELD_NAMES_POINTER + Integer.BYTES;
+ protected static final int META_SCHEMA_POINTER = SCHEMA_POINTER + Integer.BYTES;
+ protected static final int PATH_INFO_POINTER = META_SCHEMA_POINTER + Integer.BYTES;
+ protected static final int OFFSETS_SIZE = PATH_INFO_POINTER + Integer.BYTES;
+ private final ARecordType datasetType;
+ private final ARecordType metaType;
+
+ private final int numberOfPrimaryKeys;
+ private final int recordFieldIndex;
+
+ protected AbstractColumnMetadata(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys) {
+ this.datasetType = datasetType;
+ this.metaType = metaType;
+ this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+ this.recordFieldIndex = numberOfPrimaryKeys;
+ }
+
+ public final ARecordType getDatasetType() {
+ return datasetType;
+ }
+
+ public final ARecordType getMetaType() {
+ return metaType;
+ }
+
+ public final int getNumberOfPrimaryKeys() {
+ return numberOfPrimaryKeys;
+ }
+
+ public final int getRecordFieldIndex() {
+ return recordFieldIndex;
+ }
+
+ public final int getMetaRecordFieldIndex() {
+ return recordFieldIndex + 1;
+ }
+
+ public abstract int getNumberOfColumns();
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesDictionary.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesDictionary.java
new file mode 100644
index 0000000..aa2e194
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesDictionary.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+
+public class FieldNamesDictionary {
+ //For both declared and inferred fields
+ private final List<IValueReference> fieldNames;
+ private final Object2IntMap<String> declaredFieldNamesToIndexMap;
+ private final Int2IntMap hashToFieldNameIndexMap;
+ private final IBinaryHashFunction fieldNameHashFunction;
+
+ //For declared fields
+ private final AMutableString mutableString;
+ private final AStringSerializerDeserializer stringSerDer;
+
+ //For lookups
+ private final ArrayBackedValueStorage lookupStorage;
+
+ public FieldNamesDictionary() {
+ this(new ArrayList<>(), new Object2IntOpenHashMap<>(), new Int2IntOpenHashMap());
+ }
+
+ private FieldNamesDictionary(List<IValueReference> fieldNames, Object2IntMap<String> declaredFieldNamesToIndexMap,
+ Int2IntMap hashToFieldNameIndexMap) {
+ this.fieldNames = fieldNames;
+ this.declaredFieldNamesToIndexMap = declaredFieldNamesToIndexMap;
+ this.hashToFieldNameIndexMap = hashToFieldNameIndexMap;
+
+ mutableString = new AMutableString("");
+ stringSerDer = new AStringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader());
+ fieldNameHashFunction =
+ new PointableBinaryHashFunctionFactory(UTF8StringPointable.FACTORY).createBinaryHashFunction();
+ lookupStorage = new ArrayBackedValueStorage();
+ }
+
+ public List<IValueReference> getFieldNames() {
+ return fieldNames;
+ }
+
+ //TODO solve collision (they're so rare that I haven't seen any)
+ public int getOrCreateFieldNameIndex(IValueReference fieldName) throws HyracksDataException {
+ int hash = getHash(fieldName);
+ if (!hashToFieldNameIndexMap.containsKey(hash)) {
+ int index = addFieldName(creatFieldName(fieldName), hash);
+ hashToFieldNameIndexMap.put(hash, index);
+ return index;
+ }
+ return hashToFieldNameIndexMap.get(hash);
+ }
+
+ public int getOrCreateFieldNameIndex(String fieldName) throws HyracksDataException {
+ if (!declaredFieldNamesToIndexMap.containsKey(fieldName)) {
+ IValueReference serializedFieldName = creatFieldName(fieldName);
+ int hash = getHash(serializedFieldName);
+ int index = addFieldName(serializedFieldName, hash);
+ declaredFieldNamesToIndexMap.put(fieldName, index);
+ return index;
+ }
+ return declaredFieldNamesToIndexMap.getInt(fieldName);
+ }
+
+ public int getFieldNameIndex(String fieldName) throws HyracksDataException {
+ lookupStorage.reset();
+ serializeFieldName(fieldName, lookupStorage);
+ return hashToFieldNameIndexMap.getOrDefault(getHash(lookupStorage), -1);
+ }
+
+ private ArrayBackedValueStorage creatFieldName(IValueReference fieldName) throws HyracksDataException {
+ ArrayBackedValueStorage copy = new ArrayBackedValueStorage(fieldName.getLength());
+ copy.append(fieldName);
+ return copy;
+ }
+
+ private ArrayBackedValueStorage creatFieldName(String fieldName) throws HyracksDataException {
+ ArrayBackedValueStorage serializedFieldName = new ArrayBackedValueStorage();
+ serializeFieldName(fieldName, serializedFieldName);
+ return serializedFieldName;
+ }
+
+ private void serializeFieldName(String fieldName, ArrayBackedValueStorage storage) throws HyracksDataException {
+ mutableString.setValue(fieldName);
+ stringSerDer.serialize(mutableString, storage.getDataOutput());
+ }
+
+ private int getHash(IValueReference fieldName) throws HyracksDataException {
+ byte[] object = fieldName.getByteArray();
+ int start = fieldName.getStartOffset();
+ int length = fieldName.getLength();
+
+ return fieldNameHashFunction.hash(object, start, length);
+ }
+
+ private int addFieldName(IValueReference fieldName, int hash) {
+ int index = fieldNames.size();
+ hashToFieldNameIndexMap.put(hash, index);
+ fieldNames.add(fieldName);
+ return index;
+ }
+
+ public IValueReference getFieldName(int index) {
+ return fieldNames.get(index);
+ }
+
+ public void serialize(DataOutput output) throws IOException {
+ output.writeInt(fieldNames.size());
+ for (IValueReference fieldName : fieldNames) {
+ output.writeInt(fieldName.getLength());
+ output.write(fieldName.getByteArray(), fieldName.getStartOffset(), fieldName.getLength());
+ }
+
+ output.writeInt(declaredFieldNamesToIndexMap.size());
+ for (Object2IntMap.Entry<String> declaredFieldIndex : declaredFieldNamesToIndexMap.object2IntEntrySet()) {
+ output.writeUTF(declaredFieldIndex.getKey());
+ output.writeInt(declaredFieldIndex.getIntValue());
+ }
+
+ for (Int2IntMap.Entry hashIndex : hashToFieldNameIndexMap.int2IntEntrySet()) {
+ output.writeInt(hashIndex.getIntKey());
+ output.writeInt(hashIndex.getIntValue());
+ }
+ }
+
+ public static FieldNamesDictionary deserialize(DataInput input) throws IOException {
+ int numberOfFieldNames = input.readInt();
+
+ List<IValueReference> fieldNames = new ArrayList<>();
+ deserializeFieldNames(input, fieldNames, numberOfFieldNames);
+
+ Object2IntMap<String> declaredFieldNamesToIndexMap = new Object2IntOpenHashMap<>();
+ deserializeDeclaredFieldNames(input, declaredFieldNamesToIndexMap);
+
+ Int2IntMap hashToFieldNameIndexMap = new Int2IntOpenHashMap();
+ deserializeHashToFieldNameIndex(input, hashToFieldNameIndexMap, numberOfFieldNames);
+
+ return new FieldNamesDictionary(fieldNames, declaredFieldNamesToIndexMap, hashToFieldNameIndexMap);
+ }
+
+ public void abort(DataInputStream input) throws IOException {
+ int numberOfFieldNames = input.readInt();
+
+ fieldNames.clear();
+ deserializeFieldNames(input, fieldNames, numberOfFieldNames);
+
+ declaredFieldNamesToIndexMap.clear();
+ deserializeDeclaredFieldNames(input, declaredFieldNamesToIndexMap);
+
+ hashToFieldNameIndexMap.clear();
+ deserializeHashToFieldNameIndex(input, hashToFieldNameIndexMap, numberOfFieldNames);
+ }
+
+ private static void deserializeFieldNames(DataInput input, List<IValueReference> fieldNames, int numberOfFieldNames)
+ throws IOException {
+
+ for (int i = 0; i < numberOfFieldNames; i++) {
+ int length = input.readInt();
+ ArrayBackedValueStorage fieldName = new ArrayBackedValueStorage(length);
+ fieldName.setSize(length);
+ input.readFully(fieldName.getByteArray(), 0, length);
+ fieldNames.add(fieldName);
+ }
+ }
+
+ private static void deserializeDeclaredFieldNames(DataInput input,
+ Object2IntMap<String> declaredFieldNamesToIndexMap) throws IOException {
+ int numberOfDeclaredFieldNames = input.readInt();
+ for (int i = 0; i < numberOfDeclaredFieldNames; i++) {
+ String fieldName = input.readUTF();
+ int fieldNameIndex = input.readInt();
+ declaredFieldNamesToIndexMap.put(fieldName, fieldNameIndex);
+ }
+ }
+
+ private static void deserializeHashToFieldNameIndex(DataInput input, Int2IntMap hashToFieldNameIndexMap,
+ int numberOfFieldNames) throws IOException {
+ for (int i = 0; i < numberOfFieldNames; i++) {
+ int hash = input.readInt();
+ int fieldNameIndex = input.readInt();
+ hashToFieldNameIndexMap.put(hash, fieldNameIndex);
+ }
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/PathInfoSerializer.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/PathInfoSerializer.java
new file mode 100644
index 0000000..f72b77b
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/PathInfoSerializer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+
+public class PathInfoSerializer {
+ private final ArrayBackedValueStorage primaryKeyOutputPathStorage;
+ private final ArrayBackedValueStorage pathOutputStorage;
+ private final IntList delimiters;
+ private int level;
+
+ public PathInfoSerializer() {
+ primaryKeyOutputPathStorage = new ArrayBackedValueStorage();
+ pathOutputStorage = new ArrayBackedValueStorage();
+ delimiters = new IntArrayList();
+ level = 0;
+ }
+
+ public void reset() {
+ primaryKeyOutputPathStorage.reset();
+ pathOutputStorage.reset();
+ }
+
+ public void enter(AbstractSchemaNestedNode nestedNode) {
+ if (nestedNode.isCollection()) {
+ delimiters.add(0, level - 1);
+ }
+ if (nestedNode.isObjectOrCollection()) {
+ level++;
+ }
+ }
+
+ public void exit(AbstractSchemaNestedNode nestedNode) {
+ if (nestedNode.isCollection()) {
+ delimiters.removeInt(0);
+ }
+ if (nestedNode.isObjectOrCollection()) {
+ level--;
+ }
+ }
+
+ public void writePathInfo(ATypeTag typeTag, int columnIndex, boolean primaryKey) throws IOException {
+ DataOutput output =
+ primaryKey ? primaryKeyOutputPathStorage.getDataOutput() : pathOutputStorage.getDataOutput();
+ //type tag
+ output.write(typeTag.serialize());
+ //columnIndex
+ output.writeInt(columnIndex);
+ //maxLevel
+ output.writeInt(level);
+ //is primary key
+ output.writeBoolean(primaryKey);
+ //Is collection
+ boolean collection = !delimiters.isEmpty();
+ output.writeBoolean(collection);
+ if (collection) {
+ output.writeInt(delimiters.size());
+ for (int i = 0; i < delimiters.size(); i++) {
+ output.writeInt(delimiters.getInt(i));
+ }
+ }
+ }
+
+ public void serialize(DataOutput output, int numberOfColumns) throws IOException {
+ output.writeInt(numberOfColumns);
+ output.write(primaryKeyOutputPathStorage.getByteArray(), 0, primaryKeyOutputPathStorage.getLength());
+ output.write(pathOutputStorage.getByteArray(), 0, pathOutputStorage.getLength());
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNestedNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNestedNode.java
new file mode 100644
index 0000000..187e460
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNestedNode.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema;
+
+public abstract class AbstractSchemaNestedNode extends AbstractSchemaNode {
+
+ @Override
+ public final boolean isNested() {
+ return true;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNode.java
new file mode 100644
index 0000000..c9d8635
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNode.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import org.apache.asterix.column.metadata.schema.collection.ArraySchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.MultisetSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.MissingFieldSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractSchemaNode {
+ private int counter;
+
+ public abstract ATypeTag getTypeTag();
+
+ public abstract boolean isNested();
+
+ public abstract boolean isObjectOrCollection();
+
+ public abstract boolean isCollection();
+
+ public final void incrementCounter() {
+ counter++;
+ }
+
+ public final void setCounter(int counter) {
+ this.counter = counter;
+ }
+
+ public final int getCounter() {
+ return counter;
+ }
+
+ public abstract <R, T> R accept(ISchemaNodeVisitor<R, T> visitor, T arg) throws HyracksDataException;
+
+ public abstract void serialize(DataOutput output, PathInfoSerializer pathInfoSerializer) throws IOException;
+
+ public static AbstractSchemaNode deserialize(DataInput input,
+ Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels) throws IOException {
+ ATypeTag typeTag = ATypeTag.VALUE_TYPE_MAPPING[input.readByte()];
+ switch (typeTag) {
+ case SYSTEM_NULL:
+ return MissingFieldSchemaNode.INSTANCE;
+ case OBJECT:
+ return new ObjectSchemaNode(input, definitionLevels);
+ case ARRAY:
+ return new ArraySchemaNode(input, definitionLevels);
+ case MULTISET:
+ return new MultisetSchemaNode(input, definitionLevels);
+ case UNION:
+ return new UnionSchemaNode(input, definitionLevels);
+ case NULL:
+ case MISSING:
+ case BOOLEAN:
+ case BIGINT:
+ case DOUBLE:
+ case STRING:
+ case UUID:
+ return new PrimitiveSchemaNode(typeTag, input);
+ default:
+ throw new UnsupportedEncodingException(typeTag + " is not supported");
+ }
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ISchemaNodeVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ISchemaNodeVisitor.java
new file mode 100644
index 0000000..4d38156
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ISchemaNodeVisitor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema;
+
+import org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface ISchemaNodeVisitor<R, T> {
+ R visit(ObjectSchemaNode objectNode, T arg) throws HyracksDataException;
+
+ R visit(AbstractCollectionSchemaNode collectionNode, T arg) throws HyracksDataException;
+
+ R visit(UnionSchemaNode unionNode, T arg) throws HyracksDataException;
+
+ R visit(PrimitiveSchemaNode primitiveNode, T arg) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
new file mode 100644
index 0000000..a230e86
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import org.apache.asterix.column.metadata.schema.primitive.MissingFieldSchemaNode;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.util.annotations.CriticalPath;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntMap.Entry;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntImmutableList;
+import it.unimi.dsi.fastutil.ints.IntList;
+
+public final class ObjectSchemaNode extends AbstractSchemaNestedNode {
+ private final Int2IntMap fieldNameIndexToChildIndexMap;
+ private final List<AbstractSchemaNode> children;
+
+ public ObjectSchemaNode() {
+ fieldNameIndexToChildIndexMap = new Int2IntOpenHashMap();
+ children = new ArrayList<>();
+ }
+
+ ObjectSchemaNode(DataInput input, Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels)
+ throws IOException {
+ if (definitionLevels != null) {
+ definitionLevels.put(this, new RunLengthIntArray());
+ }
+ int numberOfChildren = input.readInt();
+
+ fieldNameIndexToChildIndexMap = new Int2IntOpenHashMap();
+ deserializeFieldNameIndexToChildIndex(input, fieldNameIndexToChildIndexMap, numberOfChildren);
+
+ children = new ArrayList<>();
+ deserializeChildren(input, children, numberOfChildren, definitionLevels);
+ }
+
+ public AbstractSchemaNode getOrCreateChild(IValueReference fieldName, ATypeTag childTypeTag,
+ FlushColumnMetadata columnMetadata) throws HyracksDataException {
+ int numberOfChildren = children.size();
+ int fieldNameIndex = columnMetadata.getFieldNamesDictionary().getOrCreateFieldNameIndex(fieldName);
+ int childIndex = fieldNameIndexToChildIndexMap.getOrDefault(fieldNameIndex, numberOfChildren);
+ AbstractSchemaNode currentChild = childIndex == numberOfChildren ? null : children.get(childIndex);
+ AbstractSchemaNode newChild = columnMetadata.getOrCreateChild(currentChild, childTypeTag);
+ if (currentChild == null) {
+ children.add(childIndex, newChild);
+ fieldNameIndexToChildIndexMap.put(fieldNameIndex, childIndex);
+ } else if (currentChild != newChild) {
+ children.set(childIndex, newChild);
+ }
+
+ return newChild;
+ }
+
+ public void addChild(int fieldNameIndex, AbstractSchemaNode child) {
+ int childIndex = children.size();
+ fieldNameIndexToChildIndexMap.put(fieldNameIndex, childIndex);
+ children.add(child);
+ }
+
+ public AbstractSchemaNode getChild(int fieldNameIndex) {
+ if (fieldNameIndexToChildIndexMap.containsKey(fieldNameIndex)) {
+ return children.get(fieldNameIndexToChildIndexMap.get(fieldNameIndex));
+ }
+ return MissingFieldSchemaNode.INSTANCE;
+ }
+
+ public void removeChild(int fieldNameIndex) {
+ int childIndex = fieldNameIndexToChildIndexMap.remove(fieldNameIndex);
+ children.remove(childIndex);
+ }
+
+ public List<AbstractSchemaNode> getChildren() {
+ return children;
+ }
+
+ /**
+ * Should not be used in a {@link CriticalPath}
+ */
+ public IntList getChildrenFieldNameIndexes() {
+ return IntImmutableList.toList(fieldNameIndexToChildIndexMap.int2IntEntrySet().stream()
+ .sorted(Comparator.comparingInt(Entry::getIntValue)).mapToInt(Entry::getIntKey));
+ }
+
+ public boolean containsField(int fieldNameIndex) {
+ return fieldNameIndexToChildIndexMap.containsKey(fieldNameIndex);
+ }
+
+ @Override
+ public ATypeTag getTypeTag() {
+ return ATypeTag.OBJECT;
+ }
+
+ @Override
+ public boolean isObjectOrCollection() {
+ return true;
+ }
+
+ @Override
+ public boolean isCollection() {
+ return false;
+ }
+
+ @Override
+ public <R, T> R accept(ISchemaNodeVisitor<R, T> visitor, T arg) throws HyracksDataException {
+ return visitor.visit(this, arg);
+ }
+
+ @Override
+ public void serialize(DataOutput output, PathInfoSerializer pathInfoSerializer) throws IOException {
+ output.write(ATypeTag.OBJECT.serialize());
+ output.writeInt(children.size());
+ for (Int2IntMap.Entry fieldNameIndexChildIndex : fieldNameIndexToChildIndexMap.int2IntEntrySet()) {
+ output.writeInt(fieldNameIndexChildIndex.getIntKey());
+ output.writeInt(fieldNameIndexChildIndex.getIntValue());
+ }
+ pathInfoSerializer.enter(this);
+ for (AbstractSchemaNode child : children) {
+ child.serialize(output, pathInfoSerializer);
+ }
+ pathInfoSerializer.exit(this);
+ }
+
+ public void abort(DataInputStream input, Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels)
+ throws IOException {
+ definitionLevels.put(this, new RunLengthIntArray());
+
+ int numberOfChildren = input.readInt();
+
+ fieldNameIndexToChildIndexMap.clear();
+ deserializeFieldNameIndexToChildIndex(input, fieldNameIndexToChildIndexMap, numberOfChildren);
+
+ children.clear();
+ deserializeChildren(input, children, numberOfChildren, definitionLevels);
+ }
+
+ private static void deserializeFieldNameIndexToChildIndex(DataInput input, Int2IntMap fieldNameIndexToChildIndexMap,
+ int numberOfChildren) throws IOException {
+ for (int i = 0; i < numberOfChildren; i++) {
+ int fieldNameIndex = input.readInt();
+ int childIndex = input.readInt();
+ fieldNameIndexToChildIndexMap.put(fieldNameIndex, childIndex);
+ }
+ }
+
+ private static void deserializeChildren(DataInput input, List<AbstractSchemaNode> children, int numberOfChildren,
+ Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels) throws IOException {
+ for (int i = 0; i < numberOfChildren; i++) {
+ children.add(AbstractSchemaNode.deserialize(input, definitionLevels));
+ }
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java
new file mode 100644
index 0000000..4c067bd
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.EnumMap;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import org.apache.asterix.column.metadata.schema.primitive.MissingFieldSchemaNode;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class UnionSchemaNode extends AbstractSchemaNestedNode {
+ private final AbstractSchemaNode originalType;
+ private final Map<ATypeTag, AbstractSchemaNode> children;
+
+ public UnionSchemaNode(AbstractSchemaNode child1, AbstractSchemaNode child2) {
+ children = new EnumMap<>(ATypeTag.class);
+ originalType = child1;
+ putChild(child1);
+ putChild(child2);
+ }
+
+ UnionSchemaNode(DataInput input, Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels)
+ throws IOException {
+ if (definitionLevels != null) {
+ definitionLevels.put(this, new RunLengthIntArray());
+ }
+ ATypeTag originalTypeTag = ATypeTag.VALUE_TYPE_MAPPING[input.readByte()];
+ int numberOfChildren = input.readInt();
+ children = new EnumMap<>(ATypeTag.class);
+ for (int i = 0; i < numberOfChildren; i++) {
+ AbstractSchemaNode child = AbstractSchemaNode.deserialize(input, definitionLevels);
+ children.put(child.getTypeTag(), child);
+ }
+ originalType = children.get(originalTypeTag);
+ }
+
+ private void putChild(AbstractSchemaNode child) {
+ children.put(child.getTypeTag(), child);
+ }
+
+ public AbstractSchemaNode getOriginalType() {
+ return originalType;
+ }
+
+ public AbstractSchemaNode getOrCreateChild(ATypeTag childTypeTag, FlushColumnMetadata columnMetadata)
+ throws HyracksDataException {
+ ATypeTag normalizedTypeTag = FlushColumnMetadata.getNormalizedTypeTag(childTypeTag);
+ AbstractSchemaNode currentChild = children.get(normalizedTypeTag);
+ //The parent of a union child should be the actual parent
+ AbstractSchemaNode newChild = columnMetadata.getOrCreateChild(currentChild, normalizedTypeTag);
+ if (currentChild != newChild) {
+ putChild(newChild);
+ }
+ return newChild;
+ }
+
+ public AbstractSchemaNode getChild(ATypeTag typeTag) {
+ return children.getOrDefault(typeTag, MissingFieldSchemaNode.INSTANCE);
+ }
+
+ public Map<ATypeTag, AbstractSchemaNode> getChildren() {
+ return children;
+ }
+
+ @Override
+ public boolean isObjectOrCollection() {
+ return false;
+ }
+
+ @Override
+ public boolean isCollection() {
+ return false;
+ }
+
+ @Override
+ public ATypeTag getTypeTag() {
+ return ATypeTag.UNION;
+ }
+
+ @Override
+ public <R, T> R accept(ISchemaNodeVisitor<R, T> visitor, T arg) throws HyracksDataException {
+ return visitor.visit(this, arg);
+ }
+
+ @Override
+ public void serialize(DataOutput output, PathInfoSerializer pathInfoSerializer) throws IOException {
+ output.write(ATypeTag.UNION.serialize());
+ output.writeByte(originalType.getTypeTag().serialize());
+ output.writeInt(children.size());
+ pathInfoSerializer.enter(this);
+ for (AbstractSchemaNode child : children.values()) {
+ child.serialize(output, pathInfoSerializer);
+ }
+ pathInfoSerializer.exit(this);
+ }
+
+ /**
+ * This would return any numeric node
+ *
+ * @return first numeric node or missing node
+ * @see org.apache.asterix.column.operation.query.SchemaClipperVisitor
+ */
+ public AbstractSchemaNode getNumericChildOrMissing() {
+ for (AbstractSchemaNode node : children.values()) {
+ if (ATypeHierarchy.getTypeDomain(node.getTypeTag()) == ATypeHierarchy.Domain.NUMERIC) {
+ return node;
+ }
+ }
+ return MissingFieldSchemaNode.INSTANCE;
+ }
+
+ public int getNumberOfNumericChildren() {
+ int counter = 0;
+ for (AbstractSchemaNode node : children.values()) {
+ if (ATypeHierarchy.getTypeDomain(node.getTypeTag()) == ATypeHierarchy.Domain.NUMERIC) {
+ counter++;
+ }
+ }
+
+ return counter;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/AbstractCollectionSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/AbstractCollectionSchemaNode.java
new file mode 100644
index 0000000..8455864
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/AbstractCollectionSchemaNode.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.collection;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ISchemaNodeVisitor;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractCollectionSchemaNode extends AbstractSchemaNestedNode {
+ private AbstractSchemaNode item;
+
+ AbstractCollectionSchemaNode() {
+ item = null;
+ }
+
+ AbstractCollectionSchemaNode(DataInput input, Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels)
+ throws IOException {
+ if (definitionLevels != null) {
+ definitionLevels.put(this, new RunLengthIntArray());
+ }
+ item = AbstractSchemaNode.deserialize(input, definitionLevels);
+ }
+
+ public final AbstractSchemaNode getOrCreateItem(ATypeTag childTypeTag, FlushColumnMetadata columnMetadata)
+ throws HyracksDataException {
+ AbstractSchemaNode newItem = columnMetadata.getOrCreateChild(item, childTypeTag);
+ if (newItem != item) {
+ item = newItem;
+ }
+ return item;
+ }
+
+ public final AbstractSchemaNode getItemNode() {
+ return item;
+ }
+
+ public final void setItemNode(AbstractSchemaNode item) {
+ this.item = item;
+ }
+
+ @Override
+ public final <R, T> R accept(ISchemaNodeVisitor<R, T> visitor, T arg) throws HyracksDataException {
+ return visitor.visit(this, arg);
+ }
+
+ @Override
+ public final boolean isObjectOrCollection() {
+ return true;
+ }
+
+ @Override
+ public final boolean isCollection() {
+ return true;
+ }
+
+ @Override
+ public final void serialize(DataOutput output, PathInfoSerializer pathInfoSerializer) throws IOException {
+ output.write(getTypeTag().serialize());
+ pathInfoSerializer.enter(this);
+ item.serialize(output, pathInfoSerializer);
+ pathInfoSerializer.exit(this);
+ }
+
+ public static AbstractCollectionSchemaNode create(ATypeTag typeTag) {
+ if (typeTag == ATypeTag.ARRAY) {
+ return new ArraySchemaNode();
+ }
+
+ return new MultisetSchemaNode();
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/ArraySchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/ArraySchemaNode.java
new file mode 100644
index 0000000..084a434
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/ArraySchemaNode.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.collection;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+
+public final class ArraySchemaNode extends AbstractCollectionSchemaNode {
+
+ public ArraySchemaNode() {
+ super();
+ }
+
+ public ArraySchemaNode(DataInput input, Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels)
+ throws IOException {
+ super(input, definitionLevels);
+ }
+
+ @Override
+ public ATypeTag getTypeTag() {
+ return ATypeTag.ARRAY;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/MultisetSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/MultisetSchemaNode.java
new file mode 100644
index 0000000..af27a5a
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/MultisetSchemaNode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.collection;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+
+public final class MultisetSchemaNode extends AbstractCollectionSchemaNode {
+ public MultisetSchemaNode() {
+ super();
+ }
+
+ public MultisetSchemaNode(DataInput input, Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels)
+ throws IOException {
+ super(input, definitionLevels);
+ }
+
+ @Override
+ public ATypeTag getTypeTag() {
+ return ATypeTag.MULTISET;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/MissingFieldSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/MissingFieldSchemaNode.java
new file mode 100644
index 0000000..98f408e
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/MissingFieldSchemaNode.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.primitive;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.om.types.ATypeTag;
+
+/**
+ * A special schema node a non-existing object or union field
+ */
+public final class MissingFieldSchemaNode extends PrimitiveSchemaNode {
+ public static final AbstractSchemaNode INSTANCE = new MissingFieldSchemaNode();
+
+ private MissingFieldSchemaNode() {
+ super(-1, ATypeTag.MISSING, false);
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/PrimitiveSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/PrimitiveSchemaNode.java
new file mode 100644
index 0000000..28d379d
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/PrimitiveSchemaNode.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.primitive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ISchemaNodeVisitor;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class PrimitiveSchemaNode extends AbstractSchemaNode {
+ private final int columnIndex;
+ private final ATypeTag typeTag;
+ private final boolean primaryKey;
+
+ public PrimitiveSchemaNode(int columnIndex, ATypeTag typeTag, boolean primaryKey) {
+ this.columnIndex = columnIndex;
+ this.typeTag = typeTag;
+ this.primaryKey = primaryKey;
+ }
+
+ public PrimitiveSchemaNode(ATypeTag typeTag, DataInput input) throws IOException {
+ this.typeTag = typeTag;
+ columnIndex = input.readInt();
+ primaryKey = input.readBoolean();
+ }
+
+ public final int getColumnIndex() {
+ return columnIndex;
+ }
+
+ @Override
+ public final ATypeTag getTypeTag() {
+ return typeTag;
+ }
+
+ @Override
+ public final boolean isNested() {
+ return false;
+ }
+
+ @Override
+ public final boolean isObjectOrCollection() {
+ return false;
+ }
+
+ @Override
+ public final boolean isCollection() {
+ return false;
+ }
+
+ public final boolean isPrimaryKey() {
+ return primaryKey;
+ }
+
+ @Override
+ public final <R, T> R accept(ISchemaNodeVisitor<R, T> visitor, T arg) throws HyracksDataException {
+ return visitor.visit(this, arg);
+ }
+
+ @Override
+ public void serialize(DataOutput output, PathInfoSerializer pathInfoSerializer) throws IOException {
+ output.write(typeTag.serialize());
+ output.writeInt(columnIndex);
+ output.writeBoolean(primaryKey);
+ pathInfoSerializer.writePathInfo(typeTag, columnIndex, primaryKey);
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/PathExtractorVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/PathExtractorVisitor.java
new file mode 100644
index 0000000..2917074
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/PathExtractorVisitor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.visitor;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ISchemaNodeVisitor;
+import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
+import org.apache.asterix.column.metadata.schema.UnionSchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.MissingFieldSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class PathExtractorVisitor implements ISchemaNodeVisitor<AbstractSchemaNode, Void> {
+ @Override
+ public AbstractSchemaNode visit(ObjectSchemaNode objectNode, Void arg) throws HyracksDataException {
+ int fieldNameIndex = objectNode.getChildrenFieldNameIndexes().getInt(0);
+ if (fieldNameIndex < 0) {
+ return MissingFieldSchemaNode.INSTANCE;
+ }
+ return objectNode.getChild(fieldNameIndex).accept(this, null);
+ }
+
+ @Override
+ public AbstractSchemaNode visit(AbstractCollectionSchemaNode collectionNode, Void arg) throws HyracksDataException {
+ AbstractSchemaNode itemNode = collectionNode.getItemNode();
+ if (itemNode == null) {
+ return MissingFieldSchemaNode.INSTANCE;
+ }
+ return collectionNode.getItemNode().accept(this, null);
+ }
+
+ @Override
+ public AbstractSchemaNode visit(UnionSchemaNode unionNode, Void arg) throws HyracksDataException {
+ for (AbstractSchemaNode node : unionNode.getChildren().values()) {
+ // Using 'for-loop' is the only get the child out of a collection
+ return node.accept(this, null);
+ }
+ return MissingFieldSchemaNode.INSTANCE;
+ }
+
+ @Override
+ public AbstractSchemaNode visit(PrimitiveSchemaNode primitiveNode, Void arg) throws HyracksDataException {
+ //Missing column index is -1
+ return primitiveNode;
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java
new file mode 100644
index 0000000..fb098fa
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.visitor;
+
+import java.util.List;
+
+import org.apache.asterix.column.metadata.FieldNamesDictionary;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.ArraySchemaNode;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.IATypeVisitor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class SchemaBuilderFromIATypeVisitor implements IATypeVisitor<Void, AbstractSchemaNode> {
+ private final FlushColumnMetadata columnMetadata;
+ private final List<List<String>> primaryKeys;
+ private List<String> currentPrimaryKeyPath;
+ private int processedPrimaryKeys;
+ private int currentPathIndex;
+
+ public SchemaBuilderFromIATypeVisitor(FlushColumnMetadata columnMetadata, List<List<String>> primaryKeys) {
+ this.columnMetadata = columnMetadata;
+ this.primaryKeys = primaryKeys;
+ processedPrimaryKeys = 0;
+ }
+
+ @Override
+ public Void visit(ARecordType recordType, AbstractSchemaNode arg) {
+ ObjectSchemaNode objectNode = (ObjectSchemaNode) arg;
+ columnMetadata.enterLevel(objectNode);
+ try {
+ if (processedPrimaryKeys < primaryKeys.size()) {
+ processPrimaryKeys(recordType, objectNode);
+ }
+ for (int i = 0; i < recordType.getFieldTypes().length; i++) {
+ processField(i, recordType, objectNode);
+ }
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ columnMetadata.exitLevel(objectNode);
+ return null;
+ }
+
+ @Override
+ public Void visit(AbstractCollectionType collectionType, AbstractSchemaNode arg) {
+ ArraySchemaNode collectionNode = (ArraySchemaNode) arg;
+ IAType itemType = collectionType.getItemType();
+ columnMetadata.enterLevel(collectionNode);
+ try {
+ AbstractSchemaNode itemNode = collectionNode.getOrCreateItem(itemType.getTypeTag(), columnMetadata);
+ itemType.accept(this, itemNode);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ columnMetadata.exitLevel(collectionNode);
+ return null;
+ }
+
+ @Override
+ public Void visit(AUnionType unionType, AbstractSchemaNode arg) {
+ throw new IllegalStateException(unionType.getTypeTag() + " is not a declared type");
+ }
+
+ @Override
+ public Void visitFlat(IAType flatType, AbstractSchemaNode arg) {
+ if (processedPrimaryKeys < primaryKeys.size()) {
+ processedPrimaryKeys++;
+ }
+ return null;
+ }
+
+ /*
+ * **************************************************************
+ * Handling primary keys and record fields conversion
+ * **************************************************************
+ */
+ private void processPrimaryKeys(ARecordType recordType, ObjectSchemaNode objectNode) throws HyracksDataException {
+ if (objectNode == columnMetadata.getRoot() || objectNode == columnMetadata.getMetaRoot()) {
+ while (processedPrimaryKeys < primaryKeys.size()) {
+ currentPrimaryKeyPath = primaryKeys.get(processedPrimaryKeys);
+ currentPathIndex = 0;
+ processPrimaryKeyPath(recordType, objectNode);
+ }
+ } else {
+ currentPathIndex++;
+ processPrimaryKeyPath(recordType, objectNode);
+ }
+ }
+
+ private void processPrimaryKeyPath(ARecordType recordType, ObjectSchemaNode objectNode)
+ throws HyracksDataException {
+ int fieldIndex = recordType.getFieldIndex(currentPrimaryKeyPath.get(currentPathIndex));
+ processField(fieldIndex, recordType, objectNode);
+ }
+
+ private void processField(int fieldIndex, ARecordType recordType, ObjectSchemaNode objectNode)
+ throws HyracksDataException {
+ IAType[] fieldTypes = recordType.getFieldTypes();
+ String[] fieldNames = recordType.getFieldNames();
+ FieldNamesDictionary dictionary = columnMetadata.getFieldNamesDictionary();
+
+ int fieldNameIndex = dictionary.getOrCreateFieldNameIndex(fieldNames[fieldIndex]);
+ IValueReference fieldName = dictionary.getFieldName(fieldNameIndex);
+
+ IAType fieldType = fieldTypes[fieldIndex];
+ AbstractSchemaNode child = objectNode.getOrCreateChild(fieldName, fieldType.getTypeTag(), columnMetadata);
+
+ fieldType.accept(this, child);
+ }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
new file mode 100644
index 0000000..8cd1e98
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
@@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.flush;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.AbstractColumnMetadata;
+import org.apache.asterix.column.metadata.FieldNamesDictionary;
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
+import org.apache.asterix.column.metadata.schema.UnionSchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.ArraySchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.MultisetSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import org.apache.asterix.column.metadata.schema.visitor.SchemaBuilderFromIATypeVisitor;
+import org.apache.asterix.column.util.ColumnValuesUtil;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.column.values.IColumnValuesWriterFactory;
+import org.apache.asterix.column.values.writer.AbstractColumnValuesWriter;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+
+/**
+ * Flush column metadata belongs to a flushing {@link ILSMMemoryComponent}
+ * The schema here is mutable and can change according to the flushed records
+ */
+public final class FlushColumnMetadata extends AbstractColumnMetadata {
+ private final Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels;
+ private final Mutable<IColumnWriteMultiPageOp> multiPageOpRef;
+ private final FieldNamesDictionary fieldNamesDictionary;
+ private final ObjectSchemaNode root;
+ private final ObjectSchemaNode metaRoot;
+ private final IColumnValuesWriterFactory columnWriterFactory;
+ private final List<IColumnValuesWriter> columnWriters;
+ private final ArrayBackedValueStorage serializedMetadata;
+ private final PathInfoSerializer pathInfoSerializer;
+ private final IntArrayList nullWriterIndexes;
+ private final boolean metaContainsKeys;
+ private boolean changed;
+ private int level;
+ private int repeated;
+
+ public FlushColumnMetadata(ARecordType datasetType, ARecordType metaType, List<List<String>> primaryKeys,
+ List<Integer> keySourceIndicator, IColumnValuesWriterFactory columnWriterFactory,
+ Mutable<IColumnWriteMultiPageOp> multiPageOpRef) throws HyracksDataException {
+ super(datasetType, metaType, primaryKeys.size());
+ this.multiPageOpRef = multiPageOpRef;
+ this.columnWriterFactory = columnWriterFactory;
+ definitionLevels = new HashMap<>();
+ columnWriters = new ArrayList<>();
+ level = -1;
+ repeated = 0;
+ fieldNamesDictionary = new FieldNamesDictionary();
+ root = new ObjectSchemaNode();
+ metaRoot = metaType != null ? new ObjectSchemaNode() : null;
+ pathInfoSerializer = new PathInfoSerializer();
+ nullWriterIndexes = new IntArrayList();
+ //Add definition levels for the root
+ addDefinitionLevelsAndGet(root);
+ SchemaBuilderFromIATypeVisitor builder = new SchemaBuilderFromIATypeVisitor(this, primaryKeys);
+ //Ensure all primary keys take the first column indexes
+ metaContainsKeys = metaType != null && keySourceIndicator.get(0) == 1;
+ if (metaContainsKeys) {
+ addDefinitionLevelsAndGet(metaRoot);
+ metaType.accept(builder, metaRoot);
+ datasetType.accept(builder, root);
+ } else {
+ datasetType.accept(builder, root);
+ if (metaRoot != null) {
+ addDefinitionLevelsAndGet(metaRoot);
+ metaType.accept(builder, metaRoot);
+ }
+ }
+
+ serializedMetadata = new ArrayBackedValueStorage();
+ changed = true;
+ serializeColumnsMetadata();
+ }
+
+ private FlushColumnMetadata(ARecordType datasetType, ARecordType metaType, List<List<String>> primaryKeys,
+ boolean metaContainsKeys, IColumnValuesWriterFactory columnWriterFactory,
+ Mutable<IColumnWriteMultiPageOp> multiPageOpRef, List<IColumnValuesWriter> columnWriters,
+ FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, ObjectSchemaNode metaRoot,
+ Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels,
+ ArrayBackedValueStorage serializedMetadata) {
+ super(datasetType, metaType, primaryKeys.size());
+ this.multiPageOpRef = multiPageOpRef;
+ this.columnWriterFactory = columnWriterFactory;
+ this.definitionLevels = definitionLevels;
+ this.columnWriters = columnWriters;
+ level = -1;
+ repeated = 0;
+ this.fieldNamesDictionary = fieldNamesDictionary;
+ this.root = root;
+ this.metaRoot = metaRoot;
+ this.metaContainsKeys = metaContainsKeys;
+ pathInfoSerializer = new PathInfoSerializer();
+ nullWriterIndexes = new IntArrayList();
+ //Add definition levels for the root
+ addDefinitionLevelsAndGet(root);
+ this.serializedMetadata = serializedMetadata;
+ changed = false;
+ }
+
+ public FieldNamesDictionary getFieldNamesDictionary() {
+ return fieldNamesDictionary;
+ }
+
+ public ObjectSchemaNode getRoot() {
+ return root;
+ }
+
+ public ObjectSchemaNode getMetaRoot() {
+ return metaRoot;
+ }
+
+ public Mutable<IColumnWriteMultiPageOp> getMultiPageOpRef() {
+ return multiPageOpRef;
+ }
+
+ @Override
+ public IValueReference serializeColumnsMetadata() throws HyracksDataException {
+ if (changed) {
+ try {
+ serializeChanges();
+ changed = false;
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ return serializedMetadata;
+ }
+
+ private void serializeChanges() throws IOException {
+ serializedMetadata.reset();
+ DataOutput output = serializedMetadata.getDataOutput();
+
+ int writersOffsetPointer = reserveInt(output);
+ int fieldNamesOffsetPointer = reserveInt(output);
+ int schemaOffsetPointer = reserveInt(output);
+ int metaSchemaOffsetPointer = reserveInt(output);
+ int pathInfoOffsetPointer = reserveInt(output);
+
+ //ColumnWriterInformation
+ setOffset(writersOffsetPointer);
+ output.writeInt(columnWriters.size());
+ for (IColumnValuesWriter writer : columnWriters) {
+ writer.serialize(output);
+ }
+
+ //FieldNames
+ setOffset(fieldNamesOffsetPointer);
+ fieldNamesDictionary.serialize(output);
+
+ //Schema
+ pathInfoSerializer.reset();
+ setOffset(schemaOffsetPointer);
+ root.serialize(output, pathInfoSerializer);
+ if (metaRoot != null) {
+ //Meta schema
+ setOffset(metaSchemaOffsetPointer);
+ metaRoot.serialize(output, pathInfoSerializer);
+ }
+
+ //Path info
+ setOffset(pathInfoOffsetPointer);
+ pathInfoSerializer.serialize(output, getNumberOfColumns());
+ }
+
+ private int reserveInt(DataOutput output) throws IOException {
+ int offset = serializedMetadata.getLength();
+ output.writeInt(-1);
+ return offset;
+ }
+
+ private void setOffset(int pointer) {
+ int offset = serializedMetadata.getLength();
+ IntegerPointable.setInteger(serializedMetadata.getByteArray(), pointer, offset);
+ }
+
+ public static FlushColumnMetadata create(ARecordType datasetType, ARecordType metaType,
+ List<List<String>> primaryKeys, List<Integer> keySourceIndicator,
+ IColumnValuesWriterFactory columnWriterFactory, Mutable<IColumnWriteMultiPageOp> multiPageOpRef,
+ IValueReference serializedMetadata) throws HyracksDataException {
+ boolean metaContainsKeys = metaType != null && keySourceIndicator.get(0) == 1;
+ try {
+ return createMutableMetadata(datasetType, metaType, primaryKeys, metaContainsKeys, columnWriterFactory,
+ multiPageOpRef, serializedMetadata);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private static FlushColumnMetadata createMutableMetadata(ARecordType datasetType, ARecordType metaType,
+ List<List<String>> primaryKeys, boolean metaContainsKeys, IColumnValuesWriterFactory columnWriterFactory,
+ Mutable<IColumnWriteMultiPageOp> multiPageOpRef, IValueReference serializedMetadata) throws IOException {
+ DataInput input = new DataInputStream(new ByteArrayInputStream(serializedMetadata.getByteArray(),
+ serializedMetadata.getStartOffset(), serializedMetadata.getLength()));
+ //Skip offsets
+ input.skipBytes(OFFSETS_SIZE);
+
+ //ColumnWriter
+ List<IColumnValuesWriter> writers = new ArrayList<>();
+ deserializeWriters(input, writers, columnWriterFactory);
+
+ //FieldNames
+ FieldNamesDictionary fieldNamesDictionary = FieldNamesDictionary.deserialize(input);
+
+ //Schema
+ Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels = new HashMap<>();
+ ObjectSchemaNode root = (ObjectSchemaNode) AbstractSchemaNode.deserialize(input, definitionLevels);
+ ObjectSchemaNode metaRoot = null;
+ if (metaType != null) {
+ metaRoot = (ObjectSchemaNode) AbstractSchemaNode.deserialize(input, definitionLevels);
+ }
+
+ ArrayBackedValueStorage schemaStorage = new ArrayBackedValueStorage(serializedMetadata.getLength());
+ schemaStorage.append(serializedMetadata);
+ return new FlushColumnMetadata(datasetType, metaType, primaryKeys, metaContainsKeys, columnWriterFactory,
+ multiPageOpRef, writers, fieldNamesDictionary, root, metaRoot, definitionLevels, schemaStorage);
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ DataInputStream input = new DataInputStream(new ByteArrayInputStream(serializedMetadata.getByteArray()));
+ try {
+ abort(input);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private void abort(DataInputStream input) throws IOException {
+ level = -1;
+ repeated = 0;
+ changed = false;
+
+ columnWriters.clear();
+ deserializeWriters(input, columnWriters, columnWriterFactory);
+
+ fieldNamesDictionary.abort(input);
+ definitionLevels.clear();
+ root.abort(input, definitionLevels);
+ }
+
+ public static void deserializeWriters(DataInput input, List<IColumnValuesWriter> writers,
+ IColumnValuesWriterFactory columnWriterFactory) throws IOException {
+ int numberOfWriters = input.readInt();
+ for (int i = 0; i < numberOfWriters; i++) {
+ writers.add(AbstractColumnValuesWriter.deserialize(input, columnWriterFactory));
+ }
+ }
+
+ /* ********************************************************
+ * Column values related methods
+ * ********************************************************
+ */
+
+ /**
+ * Set {@link IColumnWriteMultiPageOp} for {@link IColumnValuesWriter}
+ *
+ * @param multiPageOp multi-buffer allocator
+ */
+ public void init(IColumnWriteMultiPageOp multiPageOp) throws HyracksDataException {
+ multiPageOpRef.setValue(multiPageOp);
+
+ //Reset writer for the first write
+ for (int i = 0; i < columnWriters.size(); i++) {
+ columnWriters.get(i).reset();
+ }
+ }
+
+ public IColumnValuesWriter getWriter(int columnIndex) {
+ return columnWriters.get(columnIndex);
+ }
+
+ /* ********************************************************
+ * Schema related methods
+ * ********************************************************
+ */
+
+ public int getLevel() {
+ return level;
+ }
+
+ @Override
+ public int getNumberOfColumns() {
+ return columnWriters.size();
+ }
+
+ public AbstractSchemaNode getOrCreateChild(AbstractSchemaNode child, ATypeTag childTypeTag)
+ throws HyracksDataException {
+ AbstractSchemaNode currentChild = child;
+ ATypeTag normalizedTypeTag = getNormalizedTypeTag(childTypeTag);
+ if (currentChild == null || normalizedTypeTag != ATypeTag.MISSING && normalizedTypeTag != ATypeTag.NULL
+ && currentChild.getTypeTag() != ATypeTag.UNION && currentChild.getTypeTag() != normalizedTypeTag) {
+ //Create a new child or union type if required type is different from the current child type
+ currentChild = createChild(child, normalizedTypeTag);
+ //Flag that the schema has changed
+ changed = true;
+ }
+ return currentChild;
+ }
+
+ public void enterLevel(AbstractSchemaNestedNode node) {
+ level++;
+ if (node.isCollection()) {
+ repeated++;
+ }
+ }
+
+ public void exitLevel(AbstractSchemaNestedNode node) {
+ level--;
+ if (node.isCollection()) {
+ repeated--;
+ }
+ }
+
+ public void enterNode(AbstractSchemaNestedNode parent, AbstractSchemaNode node) throws HyracksDataException {
+ //Flush all definition levels from parent to child
+ flushDefinitionLevels(level, parent, node);
+ if (node.isObjectOrCollection()) {
+ //Enter one more level for object, array, and multiset
+ level++;
+ if (node.isCollection()) {
+ //Tells nested values that they are repeated
+ repeated++;
+ }
+ }
+ }
+
+ public void exitNode(AbstractSchemaNode node) {
+ if (node.isNested()) {
+ //Add the nested node's level for all missing children (i.e., not entered for a record)
+ definitionLevels.get((AbstractSchemaNestedNode) node).add(level);
+ if (node.isObjectOrCollection()) {
+ //Union nodes should not change the level as they are logical nodes
+ level--;
+ }
+ }
+ node.incrementCounter();
+ }
+
+ public void exitCollectionNode(AbstractCollectionSchemaNode collectionNode, int numberOfItems) {
+ RunLengthIntArray collectionDefLevels = definitionLevels.get(collectionNode);
+ //Add delimiter
+ collectionDefLevels.add(level - 1);
+ level--;
+ repeated--;
+ collectionNode.incrementCounter();
+ }
+
+ /**
+ * Needed by {@link AbstractCollectionSchemaNode} to add the definition level for each item
+ *
+ * @param collectionSchemaNode collection node
+ * @return collection node's definition level
+ */
+ public RunLengthIntArray getDefinitionLevels(AbstractCollectionSchemaNode collectionSchemaNode) {
+ return definitionLevels.get(collectionSchemaNode);
+ }
+
+ public void clearDefinitionLevels(AbstractSchemaNestedNode nestedNode) {
+ definitionLevels.get(nestedNode).reset();
+ }
+
+ public void flushDefinitionLevels(int level, AbstractSchemaNestedNode parent, AbstractSchemaNode node)
+ throws HyracksDataException {
+ if (parent != null) {
+ RunLengthIntArray parentDefLevels = definitionLevels.get(parent);
+ if (node.getCounter() < parentDefLevels.getSize()) {
+ int parentMask = ColumnValuesUtil.getNullMask(level);
+ int childMask = ColumnValuesUtil.getNullMask(level + 1);
+ flushDefinitionLevels(parentMask, childMask, parentDefLevels, node);
+ }
+ }
+ }
+
+ private void flushDefinitionLevels(int parentMask, int childMask, RunLengthIntArray parentDefLevels,
+ AbstractSchemaNode node) throws HyracksDataException {
+ int startIndex = node.getCounter();
+ if (node.isNested()) {
+ RunLengthIntArray childDefLevels = definitionLevels.get((AbstractSchemaNestedNode) node);
+ flushNestedDefinitionLevel(parentMask, childMask, startIndex, parentDefLevels, childDefLevels);
+ } else {
+ IColumnValuesWriter writer = columnWriters.get(((PrimitiveSchemaNode) node).getColumnIndex());
+ flushWriterDefinitionLevels(parentMask, childMask, startIndex, parentDefLevels, writer);
+ }
+ node.setCounter(parentDefLevels.getSize());
+ }
+
+ private void flushNestedDefinitionLevel(int parentMask, int childMask, int startIndex,
+ RunLengthIntArray parentDefLevels, RunLengthIntArray childDefLevels) {
+ if (parentDefLevels.getSize() == 0) {
+ return;
+ }
+ //First, handle the first block as startIndex might be at the middle of a block
+ //Get which block that startIndex resides
+ int blockIndex = parentDefLevels.getBlockIndex(startIndex);
+ //Get the remaining of the first block starting from startIndex
+ int remainingValues = parentDefLevels.getBlockSize(blockIndex, startIndex);
+
+ int firstBlockValue =
+ ColumnValuesUtil.getChildValue(parentMask, childMask, parentDefLevels.getBlockValue(blockIndex));
+ //Batch add all the remaining values
+ childDefLevels.add(firstBlockValue, remainingValues);
+
+ //Add other blocks as batches
+ for (int i = blockIndex + 1; i < parentDefLevels.getNumberOfBlocks(); i++) {
+ int blockValue = ColumnValuesUtil.getChildValue(parentMask, childMask, parentDefLevels.getBlockValue(i));
+ childDefLevels.add(blockValue, parentDefLevels.getBlockSize(i));
+ }
+ }
+
+ private void flushWriterDefinitionLevels(int parentMask, int childMask, int startIndex,
+ RunLengthIntArray parentDefLevels, IColumnValuesWriter writer) throws HyracksDataException {
+ if (parentDefLevels.getSize() == 0) {
+ return;
+ }
+ /*
+ * We might need only a fraction of the first block. Hence, we first determine how many definition level
+ * values we need. Then, we write those definition levels.
+ */
+ int blockIndex = parentDefLevels.getBlockIndex(startIndex);
+ int remainingValues = parentDefLevels.getBlockSize(blockIndex, startIndex);
+ int firstBlockValue =
+ ColumnValuesUtil.getChildValue(parentMask, childMask, parentDefLevels.getBlockValue(blockIndex));
+ writer.writeLevels(firstBlockValue, remainingValues);
+
+ //Write remaining definition levels from the remaining blocks
+ for (int i = blockIndex + 1; i < parentDefLevels.getNumberOfBlocks(); i++) {
+ int blockValue = ColumnValuesUtil.getChildValue(parentMask, childMask, parentDefLevels.getBlockValue(i));
+ writer.writeLevels(blockValue, parentDefLevels.getBlockSize(i));
+ }
+ }
+
+ private AbstractSchemaNode createChild(AbstractSchemaNode child, ATypeTag normalizedTypeTag)
+ throws HyracksDataException {
+ AbstractSchemaNode createdChild;
+ if (child != null) {
+ if (child.getTypeTag() == ATypeTag.NULL) {
+ //The previous child was a NULL. The new child needs to inherit the NULL definition levels
+ int columnIndex = ((PrimitiveSchemaNode) child).getColumnIndex();
+ RunLengthIntArray defLevels = columnWriters.get(columnIndex).getDefinitionLevelsIntArray();
+ //Add the column index to be garbage collected
+ nullWriterIndexes.add(columnIndex);
+ createdChild = createChild(normalizedTypeTag);
+ int mask = ColumnValuesUtil.getNullMask(level);
+ flushDefinitionLevels(mask, mask, defLevels, createdChild);
+ } else {
+ //Different type. Make union
+ createdChild = addDefinitionLevelsAndGet(new UnionSchemaNode(child, createChild(normalizedTypeTag)));
+ }
+ } else {
+ createdChild = createChild(normalizedTypeTag);
+ }
+ return createdChild;
+ }
+
+ private AbstractSchemaNode createChild(ATypeTag normalizedTypeTag) throws HyracksDataException {
+ switch (normalizedTypeTag) {
+ case OBJECT:
+ return addDefinitionLevelsAndGet(new ObjectSchemaNode());
+ case ARRAY:
+ return addDefinitionLevelsAndGet(new ArraySchemaNode());
+ case MULTISET:
+ return addDefinitionLevelsAndGet(new MultisetSchemaNode());
+ case NULL:
+ case MISSING:
+ case BOOLEAN:
+ case DOUBLE:
+ case BIGINT:
+ case STRING:
+ case UUID:
+ int columnIndex = nullWriterIndexes.isEmpty() ? columnWriters.size() : nullWriterIndexes.removeInt(0);
+ boolean primaryKey = columnIndex < getNumberOfPrimaryKeys();
+ boolean writeAlways = primaryKey || repeated > 0;
+ boolean filtered = !primaryKey;
+ int maxLevel = primaryKey ? 1 : level + 1;
+ IColumnValuesWriter writer = columnWriterFactory.createValueWriter(normalizedTypeTag, columnIndex,
+ maxLevel, writeAlways, filtered);
+ if (multiPageOpRef.getValue() != null) {
+ writer.reset();
+ }
+ addColumn(columnIndex, writer);
+ return new PrimitiveSchemaNode(columnIndex, normalizedTypeTag, primaryKey);
+ default:
+ throw new IllegalStateException("Unsupported type " + normalizedTypeTag);
+
+ }
+ }
+
+ private void addColumn(int index, IColumnValuesWriter writer) {
+ if (index == columnWriters.size()) {
+ columnWriters.add(writer);
+ } else {
+ columnWriters.set(index, writer);
+ }
+ }
+
+ private AbstractSchemaNode addDefinitionLevelsAndGet(AbstractSchemaNestedNode nestedNode) {
+ definitionLevels.put(nestedNode, new RunLengthIntArray());
+ return nestedNode;
+ }
+
+ public static ATypeTag getNormalizedTypeTag(ATypeTag typeTag) {
+ switch (typeTag) {
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ return ATypeTag.BIGINT;
+ case FLOAT:
+ return ATypeTag.DOUBLE;
+ default:
+ return typeTag;
+ }
+ }
+
+ public void close() {
+ //Dereference multiPageOp
+ multiPageOpRef.setValue(null);
+ for (int i = 0; i < columnWriters.size(); i++) {
+ columnWriters.get(i).close();
+ }
+ }
+
+ public void addNestedNull(AbstractSchemaNestedNode parent, AbstractSchemaNestedNode node)
+ throws HyracksDataException {
+ //Flush all definition levels from parent to the current node
+ flushDefinitionLevels(level, parent, node);
+ //Add null value (+2) to say that both the parent and the child are present
+ definitionLevels.get(node).add(ColumnValuesUtil.getNullMask(level + 2) | level);
+ node.incrementCounter();
+ }
+}