[ASTERIXDB-3132][OTH] Add schema structure

- user mode changes: no
- storage format changes: no
- interface changes: no

Details:
Add schema structure, which allows evolvement (or changes)

Change-Id: I15b469ba64b3f4d561aaaff442fc92f71270a1d8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17417
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableMetadata.java
new file mode 100644
index 0000000..c7b4651
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableMetadata.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public abstract class AbstractColumnImmutableMetadata extends AbstractColumnMetadata {
+    protected final IValueReference serializedMetadata;
+    protected final int numberOfColumns;
+
+    protected AbstractColumnImmutableMetadata(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys,
+            IValueReference serializedMetadata, int numberOfColumns) {
+        super(datasetType, metaType, numberOfPrimaryKeys);
+        this.serializedMetadata = serializedMetadata;
+        this.numberOfColumns = numberOfColumns;
+    }
+
+    @Override
+    public final IValueReference serializeColumnsMetadata() {
+        return serializedMetadata;
+    }
+
+    @Override
+    public final void abort() throws HyracksDataException {
+        //NoOp as the metadata is immutable
+    }
+
+    @Override
+    public int getNumberOfColumns() {
+        return numberOfColumns;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
new file mode 100644
index 0000000..5ac38d7
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+
+public abstract class AbstractColumnImmutableReadMetadata extends AbstractColumnImmutableMetadata
+        implements IColumnProjectionInfo {
+    protected AbstractColumnImmutableReadMetadata(ARecordType datasetType, ARecordType metaType,
+            int numberOfPrimaryKeys, IValueReference serializedMetadata, int numberOfColumns) {
+        super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata, numberOfColumns);
+    }
+
+    /**
+     * @return the corresponding reader (merge reader or query reader) given <code>this</code> metadata
+     */
+    public abstract AbstractColumnTupleReader createTupleReader();
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnMetadata.java
new file mode 100644
index 0000000..4e19cbc
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnMetadata.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
+
+public abstract class AbstractColumnMetadata implements IColumnMetadata {
+    protected static final int WRITERS_POINTER = 0;
+    protected static final int FIELD_NAMES_POINTER = WRITERS_POINTER + Integer.BYTES;
+    protected static final int SCHEMA_POINTER = FIELD_NAMES_POINTER + Integer.BYTES;
+    protected static final int META_SCHEMA_POINTER = SCHEMA_POINTER + Integer.BYTES;
+    protected static final int PATH_INFO_POINTER = META_SCHEMA_POINTER + Integer.BYTES;
+    protected static final int OFFSETS_SIZE = PATH_INFO_POINTER + Integer.BYTES;
+    private final ARecordType datasetType;
+    private final ARecordType metaType;
+
+    private final int numberOfPrimaryKeys;
+    private final int recordFieldIndex;
+
+    protected AbstractColumnMetadata(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys) {
+        this.datasetType = datasetType;
+        this.metaType = metaType;
+        this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+        this.recordFieldIndex = numberOfPrimaryKeys;
+    }
+
+    public final ARecordType getDatasetType() {
+        return datasetType;
+    }
+
+    public final ARecordType getMetaType() {
+        return metaType;
+    }
+
+    public final int getNumberOfPrimaryKeys() {
+        return numberOfPrimaryKeys;
+    }
+
+    public final int getRecordFieldIndex() {
+        return recordFieldIndex;
+    }
+
+    public final int getMetaRecordFieldIndex() {
+        return recordFieldIndex + 1;
+    }
+
+    public abstract int getNumberOfColumns();
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesDictionary.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesDictionary.java
new file mode 100644
index 0000000..aa2e194
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesDictionary.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+
+public class FieldNamesDictionary {
+    //For both declared and inferred fields
+    private final List<IValueReference> fieldNames;
+    private final Object2IntMap<String> declaredFieldNamesToIndexMap;
+    private final Int2IntMap hashToFieldNameIndexMap;
+    private final IBinaryHashFunction fieldNameHashFunction;
+
+    //For declared fields
+    private final AMutableString mutableString;
+    private final AStringSerializerDeserializer stringSerDer;
+
+    //For lookups
+    private final ArrayBackedValueStorage lookupStorage;
+
+    public FieldNamesDictionary() {
+        this(new ArrayList<>(), new Object2IntOpenHashMap<>(), new Int2IntOpenHashMap());
+    }
+
+    private FieldNamesDictionary(List<IValueReference> fieldNames, Object2IntMap<String> declaredFieldNamesToIndexMap,
+            Int2IntMap hashToFieldNameIndexMap) {
+        this.fieldNames = fieldNames;
+        this.declaredFieldNamesToIndexMap = declaredFieldNamesToIndexMap;
+        this.hashToFieldNameIndexMap = hashToFieldNameIndexMap;
+
+        mutableString = new AMutableString("");
+        stringSerDer = new AStringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader());
+        fieldNameHashFunction =
+                new PointableBinaryHashFunctionFactory(UTF8StringPointable.FACTORY).createBinaryHashFunction();
+        lookupStorage = new ArrayBackedValueStorage();
+    }
+
+    public List<IValueReference> getFieldNames() {
+        return fieldNames;
+    }
+
+    //TODO solve collision (they're so rare that I haven't seen any)
+    public int getOrCreateFieldNameIndex(IValueReference fieldName) throws HyracksDataException {
+        int hash = getHash(fieldName);
+        if (!hashToFieldNameIndexMap.containsKey(hash)) {
+            int index = addFieldName(creatFieldName(fieldName), hash);
+            hashToFieldNameIndexMap.put(hash, index);
+            return index;
+        }
+        return hashToFieldNameIndexMap.get(hash);
+    }
+
+    public int getOrCreateFieldNameIndex(String fieldName) throws HyracksDataException {
+        if (!declaredFieldNamesToIndexMap.containsKey(fieldName)) {
+            IValueReference serializedFieldName = creatFieldName(fieldName);
+            int hash = getHash(serializedFieldName);
+            int index = addFieldName(serializedFieldName, hash);
+            declaredFieldNamesToIndexMap.put(fieldName, index);
+            return index;
+        }
+        return declaredFieldNamesToIndexMap.getInt(fieldName);
+    }
+
+    public int getFieldNameIndex(String fieldName) throws HyracksDataException {
+        lookupStorage.reset();
+        serializeFieldName(fieldName, lookupStorage);
+        return hashToFieldNameIndexMap.getOrDefault(getHash(lookupStorage), -1);
+    }
+
+    private ArrayBackedValueStorage creatFieldName(IValueReference fieldName) throws HyracksDataException {
+        ArrayBackedValueStorage copy = new ArrayBackedValueStorage(fieldName.getLength());
+        copy.append(fieldName);
+        return copy;
+    }
+
+    private ArrayBackedValueStorage creatFieldName(String fieldName) throws HyracksDataException {
+        ArrayBackedValueStorage serializedFieldName = new ArrayBackedValueStorage();
+        serializeFieldName(fieldName, serializedFieldName);
+        return serializedFieldName;
+    }
+
+    private void serializeFieldName(String fieldName, ArrayBackedValueStorage storage) throws HyracksDataException {
+        mutableString.setValue(fieldName);
+        stringSerDer.serialize(mutableString, storage.getDataOutput());
+    }
+
+    private int getHash(IValueReference fieldName) throws HyracksDataException {
+        byte[] object = fieldName.getByteArray();
+        int start = fieldName.getStartOffset();
+        int length = fieldName.getLength();
+
+        return fieldNameHashFunction.hash(object, start, length);
+    }
+
+    private int addFieldName(IValueReference fieldName, int hash) {
+        int index = fieldNames.size();
+        hashToFieldNameIndexMap.put(hash, index);
+        fieldNames.add(fieldName);
+        return index;
+    }
+
+    public IValueReference getFieldName(int index) {
+        return fieldNames.get(index);
+    }
+
+    public void serialize(DataOutput output) throws IOException {
+        output.writeInt(fieldNames.size());
+        for (IValueReference fieldName : fieldNames) {
+            output.writeInt(fieldName.getLength());
+            output.write(fieldName.getByteArray(), fieldName.getStartOffset(), fieldName.getLength());
+        }
+
+        output.writeInt(declaredFieldNamesToIndexMap.size());
+        for (Object2IntMap.Entry<String> declaredFieldIndex : declaredFieldNamesToIndexMap.object2IntEntrySet()) {
+            output.writeUTF(declaredFieldIndex.getKey());
+            output.writeInt(declaredFieldIndex.getIntValue());
+        }
+
+        for (Int2IntMap.Entry hashIndex : hashToFieldNameIndexMap.int2IntEntrySet()) {
+            output.writeInt(hashIndex.getIntKey());
+            output.writeInt(hashIndex.getIntValue());
+        }
+    }
+
+    public static FieldNamesDictionary deserialize(DataInput input) throws IOException {
+        int numberOfFieldNames = input.readInt();
+
+        List<IValueReference> fieldNames = new ArrayList<>();
+        deserializeFieldNames(input, fieldNames, numberOfFieldNames);
+
+        Object2IntMap<String> declaredFieldNamesToIndexMap = new Object2IntOpenHashMap<>();
+        deserializeDeclaredFieldNames(input, declaredFieldNamesToIndexMap);
+
+        Int2IntMap hashToFieldNameIndexMap = new Int2IntOpenHashMap();
+        deserializeHashToFieldNameIndex(input, hashToFieldNameIndexMap, numberOfFieldNames);
+
+        return new FieldNamesDictionary(fieldNames, declaredFieldNamesToIndexMap, hashToFieldNameIndexMap);
+    }
+
+    public void abort(DataInputStream input) throws IOException {
+        int numberOfFieldNames = input.readInt();
+
+        fieldNames.clear();
+        deserializeFieldNames(input, fieldNames, numberOfFieldNames);
+
+        declaredFieldNamesToIndexMap.clear();
+        deserializeDeclaredFieldNames(input, declaredFieldNamesToIndexMap);
+
+        hashToFieldNameIndexMap.clear();
+        deserializeHashToFieldNameIndex(input, hashToFieldNameIndexMap, numberOfFieldNames);
+    }
+
+    private static void deserializeFieldNames(DataInput input, List<IValueReference> fieldNames, int numberOfFieldNames)
+            throws IOException {
+
+        for (int i = 0; i < numberOfFieldNames; i++) {
+            int length = input.readInt();
+            ArrayBackedValueStorage fieldName = new ArrayBackedValueStorage(length);
+            fieldName.setSize(length);
+            input.readFully(fieldName.getByteArray(), 0, length);
+            fieldNames.add(fieldName);
+        }
+    }
+
+    private static void deserializeDeclaredFieldNames(DataInput input,
+            Object2IntMap<String> declaredFieldNamesToIndexMap) throws IOException {
+        int numberOfDeclaredFieldNames = input.readInt();
+        for (int i = 0; i < numberOfDeclaredFieldNames; i++) {
+            String fieldName = input.readUTF();
+            int fieldNameIndex = input.readInt();
+            declaredFieldNamesToIndexMap.put(fieldName, fieldNameIndex);
+        }
+    }
+
+    private static void deserializeHashToFieldNameIndex(DataInput input, Int2IntMap hashToFieldNameIndexMap,
+            int numberOfFieldNames) throws IOException {
+        for (int i = 0; i < numberOfFieldNames; i++) {
+            int hash = input.readInt();
+            int fieldNameIndex = input.readInt();
+            hashToFieldNameIndexMap.put(hash, fieldNameIndex);
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/PathInfoSerializer.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/PathInfoSerializer.java
new file mode 100644
index 0000000..f72b77b
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/PathInfoSerializer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+
+public class PathInfoSerializer {
+    private final ArrayBackedValueStorage primaryKeyOutputPathStorage;
+    private final ArrayBackedValueStorage pathOutputStorage;
+    private final IntList delimiters;
+    private int level;
+
+    public PathInfoSerializer() {
+        primaryKeyOutputPathStorage = new ArrayBackedValueStorage();
+        pathOutputStorage = new ArrayBackedValueStorage();
+        delimiters = new IntArrayList();
+        level = 0;
+    }
+
+    public void reset() {
+        primaryKeyOutputPathStorage.reset();
+        pathOutputStorage.reset();
+    }
+
+    public void enter(AbstractSchemaNestedNode nestedNode) {
+        if (nestedNode.isCollection()) {
+            delimiters.add(0, level - 1);
+        }
+        if (nestedNode.isObjectOrCollection()) {
+            level++;
+        }
+    }
+
+    public void exit(AbstractSchemaNestedNode nestedNode) {
+        if (nestedNode.isCollection()) {
+            delimiters.removeInt(0);
+        }
+        if (nestedNode.isObjectOrCollection()) {
+            level--;
+        }
+    }
+
+    public void writePathInfo(ATypeTag typeTag, int columnIndex, boolean primaryKey) throws IOException {
+        DataOutput output =
+                primaryKey ? primaryKeyOutputPathStorage.getDataOutput() : pathOutputStorage.getDataOutput();
+        //type tag
+        output.write(typeTag.serialize());
+        //columnIndex
+        output.writeInt(columnIndex);
+        //maxLevel
+        output.writeInt(level);
+        //is primary key
+        output.writeBoolean(primaryKey);
+        //Is collection
+        boolean collection = !delimiters.isEmpty();
+        output.writeBoolean(collection);
+        if (collection) {
+            output.writeInt(delimiters.size());
+            for (int i = 0; i < delimiters.size(); i++) {
+                output.writeInt(delimiters.getInt(i));
+            }
+        }
+    }
+
+    public void serialize(DataOutput output, int numberOfColumns) throws IOException {
+        output.writeInt(numberOfColumns);
+        output.write(primaryKeyOutputPathStorage.getByteArray(), 0, primaryKeyOutputPathStorage.getLength());
+        output.write(pathOutputStorage.getByteArray(), 0, pathOutputStorage.getLength());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNestedNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNestedNode.java
new file mode 100644
index 0000000..187e460
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNestedNode.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema;
+
+public abstract class AbstractSchemaNestedNode extends AbstractSchemaNode {
+
+    @Override
+    public final boolean isNested() {
+        return true;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNode.java
new file mode 100644
index 0000000..c9d8635
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNode.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import org.apache.asterix.column.metadata.schema.collection.ArraySchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.MultisetSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.MissingFieldSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractSchemaNode {
+    private int counter;
+
+    public abstract ATypeTag getTypeTag();
+
+    public abstract boolean isNested();
+
+    public abstract boolean isObjectOrCollection();
+
+    public abstract boolean isCollection();
+
+    public final void incrementCounter() {
+        counter++;
+    }
+
+    public final void setCounter(int counter) {
+        this.counter = counter;
+    }
+
+    public final int getCounter() {
+        return counter;
+    }
+
+    public abstract <R, T> R accept(ISchemaNodeVisitor<R, T> visitor, T arg) throws HyracksDataException;
+
+    public abstract void serialize(DataOutput output, PathInfoSerializer pathInfoSerializer) throws IOException;
+
+    public static AbstractSchemaNode deserialize(DataInput input,
+            Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels) throws IOException {
+        ATypeTag typeTag = ATypeTag.VALUE_TYPE_MAPPING[input.readByte()];
+        switch (typeTag) {
+            case SYSTEM_NULL:
+                return MissingFieldSchemaNode.INSTANCE;
+            case OBJECT:
+                return new ObjectSchemaNode(input, definitionLevels);
+            case ARRAY:
+                return new ArraySchemaNode(input, definitionLevels);
+            case MULTISET:
+                return new MultisetSchemaNode(input, definitionLevels);
+            case UNION:
+                return new UnionSchemaNode(input, definitionLevels);
+            case NULL:
+            case MISSING:
+            case BOOLEAN:
+            case BIGINT:
+            case DOUBLE:
+            case STRING:
+            case UUID:
+                return new PrimitiveSchemaNode(typeTag, input);
+            default:
+                throw new UnsupportedEncodingException(typeTag + " is not supported");
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ISchemaNodeVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ISchemaNodeVisitor.java
new file mode 100644
index 0000000..4d38156
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ISchemaNodeVisitor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema;
+
+import org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface ISchemaNodeVisitor<R, T> {
+    R visit(ObjectSchemaNode objectNode, T arg) throws HyracksDataException;
+
+    R visit(AbstractCollectionSchemaNode collectionNode, T arg) throws HyracksDataException;
+
+    R visit(UnionSchemaNode unionNode, T arg) throws HyracksDataException;
+
+    R visit(PrimitiveSchemaNode primitiveNode, T arg) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
new file mode 100644
index 0000000..a230e86
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import org.apache.asterix.column.metadata.schema.primitive.MissingFieldSchemaNode;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.util.annotations.CriticalPath;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntMap.Entry;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntImmutableList;
+import it.unimi.dsi.fastutil.ints.IntList;
+
+public final class ObjectSchemaNode extends AbstractSchemaNestedNode {
+    private final Int2IntMap fieldNameIndexToChildIndexMap;
+    private final List<AbstractSchemaNode> children;
+
+    public ObjectSchemaNode() {
+        fieldNameIndexToChildIndexMap = new Int2IntOpenHashMap();
+        children = new ArrayList<>();
+    }
+
+    ObjectSchemaNode(DataInput input, Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels)
+            throws IOException {
+        if (definitionLevels != null) {
+            definitionLevels.put(this, new RunLengthIntArray());
+        }
+        int numberOfChildren = input.readInt();
+
+        fieldNameIndexToChildIndexMap = new Int2IntOpenHashMap();
+        deserializeFieldNameIndexToChildIndex(input, fieldNameIndexToChildIndexMap, numberOfChildren);
+
+        children = new ArrayList<>();
+        deserializeChildren(input, children, numberOfChildren, definitionLevels);
+    }
+
+    public AbstractSchemaNode getOrCreateChild(IValueReference fieldName, ATypeTag childTypeTag,
+            FlushColumnMetadata columnMetadata) throws HyracksDataException {
+        int numberOfChildren = children.size();
+        int fieldNameIndex = columnMetadata.getFieldNamesDictionary().getOrCreateFieldNameIndex(fieldName);
+        int childIndex = fieldNameIndexToChildIndexMap.getOrDefault(fieldNameIndex, numberOfChildren);
+        AbstractSchemaNode currentChild = childIndex == numberOfChildren ? null : children.get(childIndex);
+        AbstractSchemaNode newChild = columnMetadata.getOrCreateChild(currentChild, childTypeTag);
+        if (currentChild == null) {
+            children.add(childIndex, newChild);
+            fieldNameIndexToChildIndexMap.put(fieldNameIndex, childIndex);
+        } else if (currentChild != newChild) {
+            children.set(childIndex, newChild);
+        }
+
+        return newChild;
+    }
+
+    public void addChild(int fieldNameIndex, AbstractSchemaNode child) {
+        int childIndex = children.size();
+        fieldNameIndexToChildIndexMap.put(fieldNameIndex, childIndex);
+        children.add(child);
+    }
+
+    public AbstractSchemaNode getChild(int fieldNameIndex) {
+        if (fieldNameIndexToChildIndexMap.containsKey(fieldNameIndex)) {
+            return children.get(fieldNameIndexToChildIndexMap.get(fieldNameIndex));
+        }
+        return MissingFieldSchemaNode.INSTANCE;
+    }
+
+    public void removeChild(int fieldNameIndex) {
+        int childIndex = fieldNameIndexToChildIndexMap.remove(fieldNameIndex);
+        children.remove(childIndex);
+    }
+
+    public List<AbstractSchemaNode> getChildren() {
+        return children;
+    }
+
+    /**
+     * Should not be used in a {@link CriticalPath}
+     */
+    public IntList getChildrenFieldNameIndexes() {
+        return IntImmutableList.toList(fieldNameIndexToChildIndexMap.int2IntEntrySet().stream()
+                .sorted(Comparator.comparingInt(Entry::getIntValue)).mapToInt(Entry::getIntKey));
+    }
+
+    public boolean containsField(int fieldNameIndex) {
+        return fieldNameIndexToChildIndexMap.containsKey(fieldNameIndex);
+    }
+
+    @Override
+    public ATypeTag getTypeTag() {
+        return ATypeTag.OBJECT;
+    }
+
+    @Override
+    public boolean isObjectOrCollection() {
+        return true;
+    }
+
+    @Override
+    public boolean isCollection() {
+        return false;
+    }
+
+    @Override
+    public <R, T> R accept(ISchemaNodeVisitor<R, T> visitor, T arg) throws HyracksDataException {
+        return visitor.visit(this, arg);
+    }
+
+    @Override
+    public void serialize(DataOutput output, PathInfoSerializer pathInfoSerializer) throws IOException {
+        output.write(ATypeTag.OBJECT.serialize());
+        output.writeInt(children.size());
+        for (Int2IntMap.Entry fieldNameIndexChildIndex : fieldNameIndexToChildIndexMap.int2IntEntrySet()) {
+            output.writeInt(fieldNameIndexChildIndex.getIntKey());
+            output.writeInt(fieldNameIndexChildIndex.getIntValue());
+        }
+        pathInfoSerializer.enter(this);
+        for (AbstractSchemaNode child : children) {
+            child.serialize(output, pathInfoSerializer);
+        }
+        pathInfoSerializer.exit(this);
+    }
+
+    public void abort(DataInputStream input, Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels)
+            throws IOException {
+        definitionLevels.put(this, new RunLengthIntArray());
+
+        int numberOfChildren = input.readInt();
+
+        fieldNameIndexToChildIndexMap.clear();
+        deserializeFieldNameIndexToChildIndex(input, fieldNameIndexToChildIndexMap, numberOfChildren);
+
+        children.clear();
+        deserializeChildren(input, children, numberOfChildren, definitionLevels);
+    }
+
+    private static void deserializeFieldNameIndexToChildIndex(DataInput input, Int2IntMap fieldNameIndexToChildIndexMap,
+            int numberOfChildren) throws IOException {
+        for (int i = 0; i < numberOfChildren; i++) {
+            int fieldNameIndex = input.readInt();
+            int childIndex = input.readInt();
+            fieldNameIndexToChildIndexMap.put(fieldNameIndex, childIndex);
+        }
+    }
+
+    private static void deserializeChildren(DataInput input, List<AbstractSchemaNode> children, int numberOfChildren,
+            Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels) throws IOException {
+        for (int i = 0; i < numberOfChildren; i++) {
+            children.add(AbstractSchemaNode.deserialize(input, definitionLevels));
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java
new file mode 100644
index 0000000..4c067bd
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.EnumMap;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import org.apache.asterix.column.metadata.schema.primitive.MissingFieldSchemaNode;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class UnionSchemaNode extends AbstractSchemaNestedNode {
+    private final AbstractSchemaNode originalType;
+    private final Map<ATypeTag, AbstractSchemaNode> children;
+
+    public UnionSchemaNode(AbstractSchemaNode child1, AbstractSchemaNode child2) {
+        children = new EnumMap<>(ATypeTag.class);
+        originalType = child1;
+        putChild(child1);
+        putChild(child2);
+    }
+
+    UnionSchemaNode(DataInput input, Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels)
+            throws IOException {
+        if (definitionLevels != null) {
+            definitionLevels.put(this, new RunLengthIntArray());
+        }
+        ATypeTag originalTypeTag = ATypeTag.VALUE_TYPE_MAPPING[input.readByte()];
+        int numberOfChildren = input.readInt();
+        children = new EnumMap<>(ATypeTag.class);
+        for (int i = 0; i < numberOfChildren; i++) {
+            AbstractSchemaNode child = AbstractSchemaNode.deserialize(input, definitionLevels);
+            children.put(child.getTypeTag(), child);
+        }
+        originalType = children.get(originalTypeTag);
+    }
+
+    private void putChild(AbstractSchemaNode child) {
+        children.put(child.getTypeTag(), child);
+    }
+
+    public AbstractSchemaNode getOriginalType() {
+        return originalType;
+    }
+
+    public AbstractSchemaNode getOrCreateChild(ATypeTag childTypeTag, FlushColumnMetadata columnMetadata)
+            throws HyracksDataException {
+        ATypeTag normalizedTypeTag = FlushColumnMetadata.getNormalizedTypeTag(childTypeTag);
+        AbstractSchemaNode currentChild = children.get(normalizedTypeTag);
+        //The parent of a union child should be the actual parent
+        AbstractSchemaNode newChild = columnMetadata.getOrCreateChild(currentChild, normalizedTypeTag);
+        if (currentChild != newChild) {
+            putChild(newChild);
+        }
+        return newChild;
+    }
+
+    public AbstractSchemaNode getChild(ATypeTag typeTag) {
+        return children.getOrDefault(typeTag, MissingFieldSchemaNode.INSTANCE);
+    }
+
+    public Map<ATypeTag, AbstractSchemaNode> getChildren() {
+        return children;
+    }
+
+    @Override
+    public boolean isObjectOrCollection() {
+        return false;
+    }
+
+    @Override
+    public boolean isCollection() {
+        return false;
+    }
+
+    @Override
+    public ATypeTag getTypeTag() {
+        return ATypeTag.UNION;
+    }
+
+    @Override
+    public <R, T> R accept(ISchemaNodeVisitor<R, T> visitor, T arg) throws HyracksDataException {
+        return visitor.visit(this, arg);
+    }
+
+    @Override
+    public void serialize(DataOutput output, PathInfoSerializer pathInfoSerializer) throws IOException {
+        output.write(ATypeTag.UNION.serialize());
+        output.writeByte(originalType.getTypeTag().serialize());
+        output.writeInt(children.size());
+        pathInfoSerializer.enter(this);
+        for (AbstractSchemaNode child : children.values()) {
+            child.serialize(output, pathInfoSerializer);
+        }
+        pathInfoSerializer.exit(this);
+    }
+
+    /**
+     * This would return any numeric node
+     *
+     * @return first numeric node or missing node
+     * @see org.apache.asterix.column.operation.query.SchemaClipperVisitor
+     */
+    public AbstractSchemaNode getNumericChildOrMissing() {
+        for (AbstractSchemaNode node : children.values()) {
+            if (ATypeHierarchy.getTypeDomain(node.getTypeTag()) == ATypeHierarchy.Domain.NUMERIC) {
+                return node;
+            }
+        }
+        return MissingFieldSchemaNode.INSTANCE;
+    }
+
+    public int getNumberOfNumericChildren() {
+        int counter = 0;
+        for (AbstractSchemaNode node : children.values()) {
+            if (ATypeHierarchy.getTypeDomain(node.getTypeTag()) == ATypeHierarchy.Domain.NUMERIC) {
+                counter++;
+            }
+        }
+
+        return counter;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/AbstractCollectionSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/AbstractCollectionSchemaNode.java
new file mode 100644
index 0000000..8455864
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/AbstractCollectionSchemaNode.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.collection;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ISchemaNodeVisitor;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractCollectionSchemaNode extends AbstractSchemaNestedNode {
+    private AbstractSchemaNode item;
+
+    AbstractCollectionSchemaNode() {
+        item = null;
+    }
+
+    AbstractCollectionSchemaNode(DataInput input, Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels)
+            throws IOException {
+        if (definitionLevels != null) {
+            definitionLevels.put(this, new RunLengthIntArray());
+        }
+        item = AbstractSchemaNode.deserialize(input, definitionLevels);
+    }
+
+    public final AbstractSchemaNode getOrCreateItem(ATypeTag childTypeTag, FlushColumnMetadata columnMetadata)
+            throws HyracksDataException {
+        AbstractSchemaNode newItem = columnMetadata.getOrCreateChild(item, childTypeTag);
+        if (newItem != item) {
+            item = newItem;
+        }
+        return item;
+    }
+
+    public final AbstractSchemaNode getItemNode() {
+        return item;
+    }
+
+    public final void setItemNode(AbstractSchemaNode item) {
+        this.item = item;
+    }
+
+    @Override
+    public final <R, T> R accept(ISchemaNodeVisitor<R, T> visitor, T arg) throws HyracksDataException {
+        return visitor.visit(this, arg);
+    }
+
+    @Override
+    public final boolean isObjectOrCollection() {
+        return true;
+    }
+
+    @Override
+    public final boolean isCollection() {
+        return true;
+    }
+
+    @Override
+    public final void serialize(DataOutput output, PathInfoSerializer pathInfoSerializer) throws IOException {
+        output.write(getTypeTag().serialize());
+        pathInfoSerializer.enter(this);
+        item.serialize(output, pathInfoSerializer);
+        pathInfoSerializer.exit(this);
+    }
+
+    public static AbstractCollectionSchemaNode create(ATypeTag typeTag) {
+        if (typeTag == ATypeTag.ARRAY) {
+            return new ArraySchemaNode();
+        }
+
+        return new MultisetSchemaNode();
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/ArraySchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/ArraySchemaNode.java
new file mode 100644
index 0000000..084a434
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/ArraySchemaNode.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.collection;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+
+public final class ArraySchemaNode extends AbstractCollectionSchemaNode {
+
+    public ArraySchemaNode() {
+        super();
+    }
+
+    public ArraySchemaNode(DataInput input, Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels)
+            throws IOException {
+        super(input, definitionLevels);
+    }
+
+    @Override
+    public ATypeTag getTypeTag() {
+        return ATypeTag.ARRAY;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/MultisetSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/MultisetSchemaNode.java
new file mode 100644
index 0000000..af27a5a
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/collection/MultisetSchemaNode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.collection;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+
+public final class MultisetSchemaNode extends AbstractCollectionSchemaNode {
+    public MultisetSchemaNode() {
+        super();
+    }
+
+    public MultisetSchemaNode(DataInput input, Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels)
+            throws IOException {
+        super(input, definitionLevels);
+    }
+
+    @Override
+    public ATypeTag getTypeTag() {
+        return ATypeTag.MULTISET;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/MissingFieldSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/MissingFieldSchemaNode.java
new file mode 100644
index 0000000..98f408e
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/MissingFieldSchemaNode.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.primitive;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.om.types.ATypeTag;
+
+/**
+ * A special schema node a non-existing object or union field
+ */
+public final class MissingFieldSchemaNode extends PrimitiveSchemaNode {
+    public static final AbstractSchemaNode INSTANCE = new MissingFieldSchemaNode();
+
+    private MissingFieldSchemaNode() {
+        super(-1, ATypeTag.MISSING, false);
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/PrimitiveSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/PrimitiveSchemaNode.java
new file mode 100644
index 0000000..28d379d
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/primitive/PrimitiveSchemaNode.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.primitive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ISchemaNodeVisitor;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class PrimitiveSchemaNode extends AbstractSchemaNode {
+    private final int columnIndex;
+    private final ATypeTag typeTag;
+    private final boolean primaryKey;
+
+    public PrimitiveSchemaNode(int columnIndex, ATypeTag typeTag, boolean primaryKey) {
+        this.columnIndex = columnIndex;
+        this.typeTag = typeTag;
+        this.primaryKey = primaryKey;
+    }
+
+    public PrimitiveSchemaNode(ATypeTag typeTag, DataInput input) throws IOException {
+        this.typeTag = typeTag;
+        columnIndex = input.readInt();
+        primaryKey = input.readBoolean();
+    }
+
+    public final int getColumnIndex() {
+        return columnIndex;
+    }
+
+    @Override
+    public final ATypeTag getTypeTag() {
+        return typeTag;
+    }
+
+    @Override
+    public final boolean isNested() {
+        return false;
+    }
+
+    @Override
+    public final boolean isObjectOrCollection() {
+        return false;
+    }
+
+    @Override
+    public final boolean isCollection() {
+        return false;
+    }
+
+    public final boolean isPrimaryKey() {
+        return primaryKey;
+    }
+
+    @Override
+    public final <R, T> R accept(ISchemaNodeVisitor<R, T> visitor, T arg) throws HyracksDataException {
+        return visitor.visit(this, arg);
+    }
+
+    @Override
+    public void serialize(DataOutput output, PathInfoSerializer pathInfoSerializer) throws IOException {
+        output.write(typeTag.serialize());
+        output.writeInt(columnIndex);
+        output.writeBoolean(primaryKey);
+        pathInfoSerializer.writePathInfo(typeTag, columnIndex, primaryKey);
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/PathExtractorVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/PathExtractorVisitor.java
new file mode 100644
index 0000000..2917074
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/PathExtractorVisitor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.visitor;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ISchemaNodeVisitor;
+import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
+import org.apache.asterix.column.metadata.schema.UnionSchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.MissingFieldSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class PathExtractorVisitor implements ISchemaNodeVisitor<AbstractSchemaNode, Void> {
+    @Override
+    public AbstractSchemaNode visit(ObjectSchemaNode objectNode, Void arg) throws HyracksDataException {
+        int fieldNameIndex = objectNode.getChildrenFieldNameIndexes().getInt(0);
+        if (fieldNameIndex < 0) {
+            return MissingFieldSchemaNode.INSTANCE;
+        }
+        return objectNode.getChild(fieldNameIndex).accept(this, null);
+    }
+
+    @Override
+    public AbstractSchemaNode visit(AbstractCollectionSchemaNode collectionNode, Void arg) throws HyracksDataException {
+        AbstractSchemaNode itemNode = collectionNode.getItemNode();
+        if (itemNode == null) {
+            return MissingFieldSchemaNode.INSTANCE;
+        }
+        return collectionNode.getItemNode().accept(this, null);
+    }
+
+    @Override
+    public AbstractSchemaNode visit(UnionSchemaNode unionNode, Void arg) throws HyracksDataException {
+        for (AbstractSchemaNode node : unionNode.getChildren().values()) {
+            // Using 'for-loop' is the only get the child out of a collection
+            return node.accept(this, null);
+        }
+        return MissingFieldSchemaNode.INSTANCE;
+    }
+
+    @Override
+    public AbstractSchemaNode visit(PrimitiveSchemaNode primitiveNode, Void arg) throws HyracksDataException {
+        //Missing column index is -1
+        return primitiveNode;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java
new file mode 100644
index 0000000..fb098fa
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.schema.visitor;
+
+import java.util.List;
+
+import org.apache.asterix.column.metadata.FieldNamesDictionary;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.ArraySchemaNode;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.IATypeVisitor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class SchemaBuilderFromIATypeVisitor implements IATypeVisitor<Void, AbstractSchemaNode> {
+    private final FlushColumnMetadata columnMetadata;
+    private final List<List<String>> primaryKeys;
+    private List<String> currentPrimaryKeyPath;
+    private int processedPrimaryKeys;
+    private int currentPathIndex;
+
+    public SchemaBuilderFromIATypeVisitor(FlushColumnMetadata columnMetadata, List<List<String>> primaryKeys) {
+        this.columnMetadata = columnMetadata;
+        this.primaryKeys = primaryKeys;
+        processedPrimaryKeys = 0;
+    }
+
+    @Override
+    public Void visit(ARecordType recordType, AbstractSchemaNode arg) {
+        ObjectSchemaNode objectNode = (ObjectSchemaNode) arg;
+        columnMetadata.enterLevel(objectNode);
+        try {
+            if (processedPrimaryKeys < primaryKeys.size()) {
+                processPrimaryKeys(recordType, objectNode);
+            }
+            for (int i = 0; i < recordType.getFieldTypes().length; i++) {
+                processField(i, recordType, objectNode);
+            }
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+        columnMetadata.exitLevel(objectNode);
+        return null;
+    }
+
+    @Override
+    public Void visit(AbstractCollectionType collectionType, AbstractSchemaNode arg) {
+        ArraySchemaNode collectionNode = (ArraySchemaNode) arg;
+        IAType itemType = collectionType.getItemType();
+        columnMetadata.enterLevel(collectionNode);
+        try {
+            AbstractSchemaNode itemNode = collectionNode.getOrCreateItem(itemType.getTypeTag(), columnMetadata);
+            itemType.accept(this, itemNode);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+        columnMetadata.exitLevel(collectionNode);
+        return null;
+    }
+
+    @Override
+    public Void visit(AUnionType unionType, AbstractSchemaNode arg) {
+        throw new IllegalStateException(unionType.getTypeTag() + " is not a declared type");
+    }
+
+    @Override
+    public Void visitFlat(IAType flatType, AbstractSchemaNode arg) {
+        if (processedPrimaryKeys < primaryKeys.size()) {
+            processedPrimaryKeys++;
+        }
+        return null;
+    }
+
+    /*
+     * **************************************************************
+     * Handling primary keys and record fields conversion
+     * **************************************************************
+     */
+    private void processPrimaryKeys(ARecordType recordType, ObjectSchemaNode objectNode) throws HyracksDataException {
+        if (objectNode == columnMetadata.getRoot() || objectNode == columnMetadata.getMetaRoot()) {
+            while (processedPrimaryKeys < primaryKeys.size()) {
+                currentPrimaryKeyPath = primaryKeys.get(processedPrimaryKeys);
+                currentPathIndex = 0;
+                processPrimaryKeyPath(recordType, objectNode);
+            }
+        } else {
+            currentPathIndex++;
+            processPrimaryKeyPath(recordType, objectNode);
+        }
+    }
+
+    private void processPrimaryKeyPath(ARecordType recordType, ObjectSchemaNode objectNode)
+            throws HyracksDataException {
+        int fieldIndex = recordType.getFieldIndex(currentPrimaryKeyPath.get(currentPathIndex));
+        processField(fieldIndex, recordType, objectNode);
+    }
+
+    private void processField(int fieldIndex, ARecordType recordType, ObjectSchemaNode objectNode)
+            throws HyracksDataException {
+        IAType[] fieldTypes = recordType.getFieldTypes();
+        String[] fieldNames = recordType.getFieldNames();
+        FieldNamesDictionary dictionary = columnMetadata.getFieldNamesDictionary();
+
+        int fieldNameIndex = dictionary.getOrCreateFieldNameIndex(fieldNames[fieldIndex]);
+        IValueReference fieldName = dictionary.getFieldName(fieldNameIndex);
+
+        IAType fieldType = fieldTypes[fieldIndex];
+        AbstractSchemaNode child = objectNode.getOrCreateChild(fieldName, fieldType.getTypeTag(), columnMetadata);
+
+        fieldType.accept(this, child);
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
new file mode 100644
index 0000000..8cd1e98
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
@@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.flush;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.column.metadata.AbstractColumnMetadata;
+import org.apache.asterix.column.metadata.FieldNamesDictionary;
+import org.apache.asterix.column.metadata.PathInfoSerializer;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
+import org.apache.asterix.column.metadata.schema.UnionSchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.ArraySchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.MultisetSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import org.apache.asterix.column.metadata.schema.visitor.SchemaBuilderFromIATypeVisitor;
+import org.apache.asterix.column.util.ColumnValuesUtil;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.column.values.IColumnValuesWriterFactory;
+import org.apache.asterix.column.values.writer.AbstractColumnValuesWriter;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+
+/**
+ * Flush column metadata belongs to a flushing {@link ILSMMemoryComponent}
+ * The schema here is mutable and can change according to the flushed records
+ */
+public final class FlushColumnMetadata extends AbstractColumnMetadata {
+    private final Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels;
+    private final Mutable<IColumnWriteMultiPageOp> multiPageOpRef;
+    private final FieldNamesDictionary fieldNamesDictionary;
+    private final ObjectSchemaNode root;
+    private final ObjectSchemaNode metaRoot;
+    private final IColumnValuesWriterFactory columnWriterFactory;
+    private final List<IColumnValuesWriter> columnWriters;
+    private final ArrayBackedValueStorage serializedMetadata;
+    private final PathInfoSerializer pathInfoSerializer;
+    private final IntArrayList nullWriterIndexes;
+    private final boolean metaContainsKeys;
+    private boolean changed;
+    private int level;
+    private int repeated;
+
+    public FlushColumnMetadata(ARecordType datasetType, ARecordType metaType, List<List<String>> primaryKeys,
+            List<Integer> keySourceIndicator, IColumnValuesWriterFactory columnWriterFactory,
+            Mutable<IColumnWriteMultiPageOp> multiPageOpRef) throws HyracksDataException {
+        super(datasetType, metaType, primaryKeys.size());
+        this.multiPageOpRef = multiPageOpRef;
+        this.columnWriterFactory = columnWriterFactory;
+        definitionLevels = new HashMap<>();
+        columnWriters = new ArrayList<>();
+        level = -1;
+        repeated = 0;
+        fieldNamesDictionary = new FieldNamesDictionary();
+        root = new ObjectSchemaNode();
+        metaRoot = metaType != null ? new ObjectSchemaNode() : null;
+        pathInfoSerializer = new PathInfoSerializer();
+        nullWriterIndexes = new IntArrayList();
+        //Add definition levels for the root
+        addDefinitionLevelsAndGet(root);
+        SchemaBuilderFromIATypeVisitor builder = new SchemaBuilderFromIATypeVisitor(this, primaryKeys);
+        //Ensure all primary keys take the first column indexes
+        metaContainsKeys = metaType != null && keySourceIndicator.get(0) == 1;
+        if (metaContainsKeys) {
+            addDefinitionLevelsAndGet(metaRoot);
+            metaType.accept(builder, metaRoot);
+            datasetType.accept(builder, root);
+        } else {
+            datasetType.accept(builder, root);
+            if (metaRoot != null) {
+                addDefinitionLevelsAndGet(metaRoot);
+                metaType.accept(builder, metaRoot);
+            }
+        }
+
+        serializedMetadata = new ArrayBackedValueStorage();
+        changed = true;
+        serializeColumnsMetadata();
+    }
+
+    private FlushColumnMetadata(ARecordType datasetType, ARecordType metaType, List<List<String>> primaryKeys,
+            boolean metaContainsKeys, IColumnValuesWriterFactory columnWriterFactory,
+            Mutable<IColumnWriteMultiPageOp> multiPageOpRef, List<IColumnValuesWriter> columnWriters,
+            FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, ObjectSchemaNode metaRoot,
+            Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels,
+            ArrayBackedValueStorage serializedMetadata) {
+        super(datasetType, metaType, primaryKeys.size());
+        this.multiPageOpRef = multiPageOpRef;
+        this.columnWriterFactory = columnWriterFactory;
+        this.definitionLevels = definitionLevels;
+        this.columnWriters = columnWriters;
+        level = -1;
+        repeated = 0;
+        this.fieldNamesDictionary = fieldNamesDictionary;
+        this.root = root;
+        this.metaRoot = metaRoot;
+        this.metaContainsKeys = metaContainsKeys;
+        pathInfoSerializer = new PathInfoSerializer();
+        nullWriterIndexes = new IntArrayList();
+        //Add definition levels for the root
+        addDefinitionLevelsAndGet(root);
+        this.serializedMetadata = serializedMetadata;
+        changed = false;
+    }
+
+    public FieldNamesDictionary getFieldNamesDictionary() {
+        return fieldNamesDictionary;
+    }
+
+    public ObjectSchemaNode getRoot() {
+        return root;
+    }
+
+    public ObjectSchemaNode getMetaRoot() {
+        return metaRoot;
+    }
+
+    public Mutable<IColumnWriteMultiPageOp> getMultiPageOpRef() {
+        return multiPageOpRef;
+    }
+
+    @Override
+    public IValueReference serializeColumnsMetadata() throws HyracksDataException {
+        if (changed) {
+            try {
+                serializeChanges();
+                changed = false;
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+        return serializedMetadata;
+    }
+
+    private void serializeChanges() throws IOException {
+        serializedMetadata.reset();
+        DataOutput output = serializedMetadata.getDataOutput();
+
+        int writersOffsetPointer = reserveInt(output);
+        int fieldNamesOffsetPointer = reserveInt(output);
+        int schemaOffsetPointer = reserveInt(output);
+        int metaSchemaOffsetPointer = reserveInt(output);
+        int pathInfoOffsetPointer = reserveInt(output);
+
+        //ColumnWriterInformation
+        setOffset(writersOffsetPointer);
+        output.writeInt(columnWriters.size());
+        for (IColumnValuesWriter writer : columnWriters) {
+            writer.serialize(output);
+        }
+
+        //FieldNames
+        setOffset(fieldNamesOffsetPointer);
+        fieldNamesDictionary.serialize(output);
+
+        //Schema
+        pathInfoSerializer.reset();
+        setOffset(schemaOffsetPointer);
+        root.serialize(output, pathInfoSerializer);
+        if (metaRoot != null) {
+            //Meta schema
+            setOffset(metaSchemaOffsetPointer);
+            metaRoot.serialize(output, pathInfoSerializer);
+        }
+
+        //Path info
+        setOffset(pathInfoOffsetPointer);
+        pathInfoSerializer.serialize(output, getNumberOfColumns());
+    }
+
+    private int reserveInt(DataOutput output) throws IOException {
+        int offset = serializedMetadata.getLength();
+        output.writeInt(-1);
+        return offset;
+    }
+
+    private void setOffset(int pointer) {
+        int offset = serializedMetadata.getLength();
+        IntegerPointable.setInteger(serializedMetadata.getByteArray(), pointer, offset);
+    }
+
+    public static FlushColumnMetadata create(ARecordType datasetType, ARecordType metaType,
+            List<List<String>> primaryKeys, List<Integer> keySourceIndicator,
+            IColumnValuesWriterFactory columnWriterFactory, Mutable<IColumnWriteMultiPageOp> multiPageOpRef,
+            IValueReference serializedMetadata) throws HyracksDataException {
+        boolean metaContainsKeys = metaType != null && keySourceIndicator.get(0) == 1;
+        try {
+            return createMutableMetadata(datasetType, metaType, primaryKeys, metaContainsKeys, columnWriterFactory,
+                    multiPageOpRef, serializedMetadata);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private static FlushColumnMetadata createMutableMetadata(ARecordType datasetType, ARecordType metaType,
+            List<List<String>> primaryKeys, boolean metaContainsKeys, IColumnValuesWriterFactory columnWriterFactory,
+            Mutable<IColumnWriteMultiPageOp> multiPageOpRef, IValueReference serializedMetadata) throws IOException {
+        DataInput input = new DataInputStream(new ByteArrayInputStream(serializedMetadata.getByteArray(),
+                serializedMetadata.getStartOffset(), serializedMetadata.getLength()));
+        //Skip offsets
+        input.skipBytes(OFFSETS_SIZE);
+
+        //ColumnWriter
+        List<IColumnValuesWriter> writers = new ArrayList<>();
+        deserializeWriters(input, writers, columnWriterFactory);
+
+        //FieldNames
+        FieldNamesDictionary fieldNamesDictionary = FieldNamesDictionary.deserialize(input);
+
+        //Schema
+        Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels = new HashMap<>();
+        ObjectSchemaNode root = (ObjectSchemaNode) AbstractSchemaNode.deserialize(input, definitionLevels);
+        ObjectSchemaNode metaRoot = null;
+        if (metaType != null) {
+            metaRoot = (ObjectSchemaNode) AbstractSchemaNode.deserialize(input, definitionLevels);
+        }
+
+        ArrayBackedValueStorage schemaStorage = new ArrayBackedValueStorage(serializedMetadata.getLength());
+        schemaStorage.append(serializedMetadata);
+        return new FlushColumnMetadata(datasetType, metaType, primaryKeys, metaContainsKeys, columnWriterFactory,
+                multiPageOpRef, writers, fieldNamesDictionary, root, metaRoot, definitionLevels, schemaStorage);
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        DataInputStream input = new DataInputStream(new ByteArrayInputStream(serializedMetadata.getByteArray()));
+        try {
+            abort(input);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void abort(DataInputStream input) throws IOException {
+        level = -1;
+        repeated = 0;
+        changed = false;
+
+        columnWriters.clear();
+        deserializeWriters(input, columnWriters, columnWriterFactory);
+
+        fieldNamesDictionary.abort(input);
+        definitionLevels.clear();
+        root.abort(input, definitionLevels);
+    }
+
+    public static void deserializeWriters(DataInput input, List<IColumnValuesWriter> writers,
+            IColumnValuesWriterFactory columnWriterFactory) throws IOException {
+        int numberOfWriters = input.readInt();
+        for (int i = 0; i < numberOfWriters; i++) {
+            writers.add(AbstractColumnValuesWriter.deserialize(input, columnWriterFactory));
+        }
+    }
+
+    /* ********************************************************
+     * Column values related methods
+     * ********************************************************
+     */
+
+    /**
+     * Set {@link IColumnWriteMultiPageOp} for {@link IColumnValuesWriter}
+     *
+     * @param multiPageOp multi-buffer allocator
+     */
+    public void init(IColumnWriteMultiPageOp multiPageOp) throws HyracksDataException {
+        multiPageOpRef.setValue(multiPageOp);
+
+        //Reset writer for the first write
+        for (int i = 0; i < columnWriters.size(); i++) {
+            columnWriters.get(i).reset();
+        }
+    }
+
+    public IColumnValuesWriter getWriter(int columnIndex) {
+        return columnWriters.get(columnIndex);
+    }
+
+    /* ********************************************************
+     * Schema related methods
+     * ********************************************************
+     */
+
+    public int getLevel() {
+        return level;
+    }
+
+    @Override
+    public int getNumberOfColumns() {
+        return columnWriters.size();
+    }
+
+    public AbstractSchemaNode getOrCreateChild(AbstractSchemaNode child, ATypeTag childTypeTag)
+            throws HyracksDataException {
+        AbstractSchemaNode currentChild = child;
+        ATypeTag normalizedTypeTag = getNormalizedTypeTag(childTypeTag);
+        if (currentChild == null || normalizedTypeTag != ATypeTag.MISSING && normalizedTypeTag != ATypeTag.NULL
+                && currentChild.getTypeTag() != ATypeTag.UNION && currentChild.getTypeTag() != normalizedTypeTag) {
+            //Create a new child or union type if required type is different from the current child type
+            currentChild = createChild(child, normalizedTypeTag);
+            //Flag that the schema has changed
+            changed = true;
+        }
+        return currentChild;
+    }
+
+    public void enterLevel(AbstractSchemaNestedNode node) {
+        level++;
+        if (node.isCollection()) {
+            repeated++;
+        }
+    }
+
+    public void exitLevel(AbstractSchemaNestedNode node) {
+        level--;
+        if (node.isCollection()) {
+            repeated--;
+        }
+    }
+
+    public void enterNode(AbstractSchemaNestedNode parent, AbstractSchemaNode node) throws HyracksDataException {
+        //Flush all definition levels from parent to child
+        flushDefinitionLevels(level, parent, node);
+        if (node.isObjectOrCollection()) {
+            //Enter one more level for object, array, and multiset
+            level++;
+            if (node.isCollection()) {
+                //Tells nested values that they are repeated
+                repeated++;
+            }
+        }
+    }
+
+    public void exitNode(AbstractSchemaNode node) {
+        if (node.isNested()) {
+            //Add the nested node's level for all missing children (i.e., not entered for a record)
+            definitionLevels.get((AbstractSchemaNestedNode) node).add(level);
+            if (node.isObjectOrCollection()) {
+                //Union nodes should not change the level as they are logical nodes
+                level--;
+            }
+        }
+        node.incrementCounter();
+    }
+
+    public void exitCollectionNode(AbstractCollectionSchemaNode collectionNode, int numberOfItems) {
+        RunLengthIntArray collectionDefLevels = definitionLevels.get(collectionNode);
+        //Add delimiter
+        collectionDefLevels.add(level - 1);
+        level--;
+        repeated--;
+        collectionNode.incrementCounter();
+    }
+
+    /**
+     * Needed by {@link AbstractCollectionSchemaNode} to add the definition level for each item
+     *
+     * @param collectionSchemaNode collection node
+     * @return collection node's definition level
+     */
+    public RunLengthIntArray getDefinitionLevels(AbstractCollectionSchemaNode collectionSchemaNode) {
+        return definitionLevels.get(collectionSchemaNode);
+    }
+
+    public void clearDefinitionLevels(AbstractSchemaNestedNode nestedNode) {
+        definitionLevels.get(nestedNode).reset();
+    }
+
+    public void flushDefinitionLevels(int level, AbstractSchemaNestedNode parent, AbstractSchemaNode node)
+            throws HyracksDataException {
+        if (parent != null) {
+            RunLengthIntArray parentDefLevels = definitionLevels.get(parent);
+            if (node.getCounter() < parentDefLevels.getSize()) {
+                int parentMask = ColumnValuesUtil.getNullMask(level);
+                int childMask = ColumnValuesUtil.getNullMask(level + 1);
+                flushDefinitionLevels(parentMask, childMask, parentDefLevels, node);
+            }
+        }
+    }
+
+    private void flushDefinitionLevels(int parentMask, int childMask, RunLengthIntArray parentDefLevels,
+            AbstractSchemaNode node) throws HyracksDataException {
+        int startIndex = node.getCounter();
+        if (node.isNested()) {
+            RunLengthIntArray childDefLevels = definitionLevels.get((AbstractSchemaNestedNode) node);
+            flushNestedDefinitionLevel(parentMask, childMask, startIndex, parentDefLevels, childDefLevels);
+        } else {
+            IColumnValuesWriter writer = columnWriters.get(((PrimitiveSchemaNode) node).getColumnIndex());
+            flushWriterDefinitionLevels(parentMask, childMask, startIndex, parentDefLevels, writer);
+        }
+        node.setCounter(parentDefLevels.getSize());
+    }
+
+    private void flushNestedDefinitionLevel(int parentMask, int childMask, int startIndex,
+            RunLengthIntArray parentDefLevels, RunLengthIntArray childDefLevels) {
+        if (parentDefLevels.getSize() == 0) {
+            return;
+        }
+        //First, handle the first block as startIndex might be at the middle of a block
+        //Get which block that startIndex resides
+        int blockIndex = parentDefLevels.getBlockIndex(startIndex);
+        //Get the remaining of the first block starting from startIndex
+        int remainingValues = parentDefLevels.getBlockSize(blockIndex, startIndex);
+
+        int firstBlockValue =
+                ColumnValuesUtil.getChildValue(parentMask, childMask, parentDefLevels.getBlockValue(blockIndex));
+        //Batch add all the remaining values
+        childDefLevels.add(firstBlockValue, remainingValues);
+
+        //Add other blocks as batches
+        for (int i = blockIndex + 1; i < parentDefLevels.getNumberOfBlocks(); i++) {
+            int blockValue = ColumnValuesUtil.getChildValue(parentMask, childMask, parentDefLevels.getBlockValue(i));
+            childDefLevels.add(blockValue, parentDefLevels.getBlockSize(i));
+        }
+    }
+
+    private void flushWriterDefinitionLevels(int parentMask, int childMask, int startIndex,
+            RunLengthIntArray parentDefLevels, IColumnValuesWriter writer) throws HyracksDataException {
+        if (parentDefLevels.getSize() == 0) {
+            return;
+        }
+        /*
+         * We might need only a fraction of the first block. Hence, we first determine how many definition level
+         * values we need. Then, we write those definition levels.
+         */
+        int blockIndex = parentDefLevels.getBlockIndex(startIndex);
+        int remainingValues = parentDefLevels.getBlockSize(blockIndex, startIndex);
+        int firstBlockValue =
+                ColumnValuesUtil.getChildValue(parentMask, childMask, parentDefLevels.getBlockValue(blockIndex));
+        writer.writeLevels(firstBlockValue, remainingValues);
+
+        //Write remaining definition levels from the remaining blocks
+        for (int i = blockIndex + 1; i < parentDefLevels.getNumberOfBlocks(); i++) {
+            int blockValue = ColumnValuesUtil.getChildValue(parentMask, childMask, parentDefLevels.getBlockValue(i));
+            writer.writeLevels(blockValue, parentDefLevels.getBlockSize(i));
+        }
+    }
+
+    private AbstractSchemaNode createChild(AbstractSchemaNode child, ATypeTag normalizedTypeTag)
+            throws HyracksDataException {
+        AbstractSchemaNode createdChild;
+        if (child != null) {
+            if (child.getTypeTag() == ATypeTag.NULL) {
+                //The previous child was a NULL. The new child needs to inherit the NULL definition levels
+                int columnIndex = ((PrimitiveSchemaNode) child).getColumnIndex();
+                RunLengthIntArray defLevels = columnWriters.get(columnIndex).getDefinitionLevelsIntArray();
+                //Add the column index to be garbage collected
+                nullWriterIndexes.add(columnIndex);
+                createdChild = createChild(normalizedTypeTag);
+                int mask = ColumnValuesUtil.getNullMask(level);
+                flushDefinitionLevels(mask, mask, defLevels, createdChild);
+            } else {
+                //Different type. Make union
+                createdChild = addDefinitionLevelsAndGet(new UnionSchemaNode(child, createChild(normalizedTypeTag)));
+            }
+        } else {
+            createdChild = createChild(normalizedTypeTag);
+        }
+        return createdChild;
+    }
+
+    private AbstractSchemaNode createChild(ATypeTag normalizedTypeTag) throws HyracksDataException {
+        switch (normalizedTypeTag) {
+            case OBJECT:
+                return addDefinitionLevelsAndGet(new ObjectSchemaNode());
+            case ARRAY:
+                return addDefinitionLevelsAndGet(new ArraySchemaNode());
+            case MULTISET:
+                return addDefinitionLevelsAndGet(new MultisetSchemaNode());
+            case NULL:
+            case MISSING:
+            case BOOLEAN:
+            case DOUBLE:
+            case BIGINT:
+            case STRING:
+            case UUID:
+                int columnIndex = nullWriterIndexes.isEmpty() ? columnWriters.size() : nullWriterIndexes.removeInt(0);
+                boolean primaryKey = columnIndex < getNumberOfPrimaryKeys();
+                boolean writeAlways = primaryKey || repeated > 0;
+                boolean filtered = !primaryKey;
+                int maxLevel = primaryKey ? 1 : level + 1;
+                IColumnValuesWriter writer = columnWriterFactory.createValueWriter(normalizedTypeTag, columnIndex,
+                        maxLevel, writeAlways, filtered);
+                if (multiPageOpRef.getValue() != null) {
+                    writer.reset();
+                }
+                addColumn(columnIndex, writer);
+                return new PrimitiveSchemaNode(columnIndex, normalizedTypeTag, primaryKey);
+            default:
+                throw new IllegalStateException("Unsupported type " + normalizedTypeTag);
+
+        }
+    }
+
+    private void addColumn(int index, IColumnValuesWriter writer) {
+        if (index == columnWriters.size()) {
+            columnWriters.add(writer);
+        } else {
+            columnWriters.set(index, writer);
+        }
+    }
+
+    private AbstractSchemaNode addDefinitionLevelsAndGet(AbstractSchemaNestedNode nestedNode) {
+        definitionLevels.put(nestedNode, new RunLengthIntArray());
+        return nestedNode;
+    }
+
+    public static ATypeTag getNormalizedTypeTag(ATypeTag typeTag) {
+        switch (typeTag) {
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+                return ATypeTag.BIGINT;
+            case FLOAT:
+                return ATypeTag.DOUBLE;
+            default:
+                return typeTag;
+        }
+    }
+
+    public void close() {
+        //Dereference multiPageOp
+        multiPageOpRef.setValue(null);
+        for (int i = 0; i < columnWriters.size(); i++) {
+            columnWriters.get(i).close();
+        }
+    }
+
+    public void addNestedNull(AbstractSchemaNestedNode parent, AbstractSchemaNestedNode node)
+            throws HyracksDataException {
+        //Flush all definition levels from parent to the current node
+        flushDefinitionLevels(level, parent, node);
+        //Add null value (+2) to say that both the parent and the child are present
+        definitionLevels.get(node).add(ColumnValuesUtil.getNullMask(level + 2) | level);
+        node.incrementCounter();
+    }
+}