[ASTERIXDB-3133][STO] Added Trie based fieldNames dictionary

    - user model changes: no
    - storage format changes: no
    - interface changes: no

    Details:
    Implemented trie based fieldNames dictionary and
    replaced hash based dictionary in favour of faster
    lookup time, as hash based fieldNames dictionary
    would have gotten costlier considering hash collisions.

Change-Id: Ib2fcd5c5705343e8497e792e5e91084b1068701e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18333
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerBuilderVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerBuilderVisitor.java
index a3101e8..764b1b9 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerBuilderVisitor.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerBuilderVisitor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.assembler;
 
+import static org.apache.asterix.column.metadata.AbstractFieldNamesDictionary.DUMMY_FIELD_NAME_INDEX;
+
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
@@ -27,7 +29,6 @@
 
 import org.apache.asterix.column.assembler.value.IValueGetter;
 import org.apache.asterix.column.assembler.value.IValueGetterFactory;
-import org.apache.asterix.column.metadata.FieldNamesDictionary;
 import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
 import org.apache.asterix.column.metadata.schema.ISchemaNodeVisitor;
 import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
@@ -102,7 +103,7 @@
             for (int i = 0; i < childrenFieldNameIndexes.size(); i++) {
                 int fieldNameIdx = childrenFieldNameIndexes.getInt(i);
                 AbstractSchemaNode childNode = objectNode.getChild(fieldNameIdx);
-                if (fieldNameIdx == FieldNamesDictionary.DUMMY_FIELD_NAME_INDEX || !declaredFields.get(fieldNameIdx)) {
+                if (fieldNameIdx == DUMMY_FIELD_NAME_INDEX || !declaredFields.get(fieldNameIdx)) {
                     numberOfAddedChildren++;
                     IAType childType = getChildType(childNode, BuiltinType.ANY);
                     IValueReference fieldName = columnMetadata.getFieldNamesDictionary().getFieldName(fieldNameIdx);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractFieldNamesDictionary.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractFieldNamesDictionary.java
new file mode 100644
index 0000000..f22631b
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractFieldNamesDictionary.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.column.metadata;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+public abstract class AbstractFieldNamesDictionary implements IFieldNamesDictionary {
+    /**
+     * Dummy field name used to add a column when encountering empty object
+     */
+    public static final IValueReference DUMMY_FIELD_NAME;
+    public static final int DUMMY_FIELD_NAME_INDEX = -1;
+
+    //For declared fields
+    private final AMutableString mutableString;
+    private final AStringSerializerDeserializer stringSerDer;
+
+    static {
+        VoidPointable dummy = new VoidPointable();
+        dummy.set(new byte[0], 0, 0);
+        DUMMY_FIELD_NAME = dummy;
+    }
+
+    AbstractFieldNamesDictionary() {
+        mutableString = new AMutableString("");
+        stringSerDer = new AStringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader());
+    }
+
+    static ArrayBackedValueStorage creatFieldName(IValueReference fieldName) throws HyracksDataException {
+        ArrayBackedValueStorage copy = new ArrayBackedValueStorage(fieldName.getLength());
+        copy.append(fieldName);
+        return copy;
+    }
+
+    protected ArrayBackedValueStorage creatFieldName(String fieldName) throws HyracksDataException {
+        ArrayBackedValueStorage serializedFieldName = new ArrayBackedValueStorage();
+        serializeFieldName(fieldName, serializedFieldName);
+        return serializedFieldName;
+    }
+
+    protected void serializeFieldName(String fieldName, ArrayBackedValueStorage storage) throws HyracksDataException {
+        mutableString.setValue(fieldName);
+        stringSerDer.serialize(mutableString, storage.getDataOutput());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNameTrie.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNameTrie.java
new file mode 100644
index 0000000..e31026e
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNameTrie.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+
+public class FieldNameTrie {
+    private static final int VERSION = 1;
+    private final LookupState lookupState;
+
+    private final List<IValueReference> fieldNames;
+    private TrieNode rootNode;
+
+    public FieldNameTrie() {
+        this(new ArrayList<>());
+    }
+
+    private FieldNameTrie(List<IValueReference> fieldNames) {
+        this.fieldNames = fieldNames;
+        this.rootNode = new TrieNode();
+        lookupState = new LookupState();
+    }
+
+    public int insert(IValueReference fieldName) throws HyracksDataException {
+        int presentIndex = lookup(fieldName);
+        if (presentIndex == TrieNode.NOT_FOUND_INDEX) {
+            presentIndex = hookup(FieldNamesTrieDictionary.creatFieldName(fieldName));
+        }
+        return presentIndex;
+    }
+
+    public int lookup(IValueReference fieldName) {
+        //noinspection DuplicatedCode
+        int len = UTF8StringUtil.getUTFLength(fieldName.getByteArray(), fieldName.getStartOffset());
+        int start = fieldName.getStartOffset() + UTF8StringUtil.getNumBytesToStoreLength(len);
+        byte[] bytes = fieldName.getByteArray();
+
+        TrieNode searchNode = rootNode;
+        TrieNode prevNode = searchNode;
+
+        int byteIndex = start;
+        // previousByteIndex should point to the first byte to be compared
+        // when inserting the fieldName
+        int previousByteIndex = byteIndex;
+
+        int lastIndex = (start + len - 1);
+        while (byteIndex <= lastIndex) {
+            byte b = bytes[byteIndex];
+
+            TrieNode nextNode = searchNode.getChild(b);
+            if (nextNode == null) {
+                // saving state in case hookup is requested
+                lookupState.setState(prevNode, start, previousByteIndex, len);
+                return TrieNode.NOT_FOUND_INDEX;
+            }
+            // if the node exists, then compare the remaining byte seq.
+            prevNode = searchNode;
+            searchNode = nextNode;
+
+            if (searchNode.getLength() > 1) { // first byte will be same as byteIndex
+                // compare the stored sequence.
+                int fieldBytesLeftToCompare = lastIndex - byteIndex + 1;
+                // if the stored sequence in node is greater than the input field's
+                // byte to compare, then the result won't be there.
+                if (fieldBytesLeftToCompare < searchNode.getLength()) {
+                    // saving state in case hookup is requested
+                    lookupState.setState(prevNode, start, byteIndex, len);
+                    return TrieNode.NOT_FOUND_INDEX;
+                }
+
+                int c = 0;
+                byte[] storedFieldBytes = fieldNames.get(searchNode.getIndex()).getByteArray();
+                int storedFieldStart = searchNode.getStart();
+                previousByteIndex = byteIndex;
+                while (c < searchNode.getLength()) {
+                    if (bytes[byteIndex] != storedFieldBytes[storedFieldStart + c]) {
+                        // saving state in case hookup is requested
+                        // will restart from oldByteIndex, to make logic simpler.
+                        // other way could have been to store the splitIndex.
+                        lookupState.setState(prevNode, start, previousByteIndex, len);
+                        return TrieNode.NOT_FOUND_INDEX;
+                    }
+                    c++;
+                    byteIndex++;
+                }
+            } else {
+                previousByteIndex = byteIndex;
+                byteIndex++;
+            }
+        }
+
+        // saving state in case hookup is requested
+        lookupState.setState(prevNode, start, previousByteIndex, len);
+        return searchNode.isEndOfField() ? searchNode.getIndex() : TrieNode.NOT_FOUND_INDEX;
+    }
+
+    private int hookup(IValueReference fieldName) {
+        // since lookup operation always precedes a hookup operation
+        // we can use the saved state to start hookup.
+        int len = lookupState.getFieldLength();
+        TrieNode searchNode = lookupState.getLastNode();
+
+        // resume from the stored node.
+        int bytesToStoreLength = UTF8StringUtil.getNumBytesToStoreLength(len);
+        int start = bytesToStoreLength;
+
+        int byteIndex = lookupState.getRelativeOffsetFromStart() + bytesToStoreLength;
+        byte[] bytes = fieldName.getByteArray();
+        int lastIndex = (start + len - 1);
+        while (byteIndex <= lastIndex) {
+            byte b = bytes[byteIndex];
+            TrieNode nextNode = searchNode.getChild(b);
+            if (nextNode == null) {
+                // since there no such node, then create a node, and put
+                // rest bytes in the nodes.
+                TrieNode childNode = new TrieNode();
+                // first insert, then add the field
+                // start from byteIndex with newLength.
+                // newLength = lastIndex - byteIndex + 1
+                childNode.setIndex(fieldNames.size(), byteIndex, lastIndex - byteIndex + 1, bytesToStoreLength);
+                childNode.setIsEndOfField(true);
+                fieldNames.add(fieldName);
+
+                searchNode.putChild(b, childNode);
+                return childNode.getIndex();
+            }
+            // if node exists, compare the remaining byte seq
+            searchNode = nextNode;
+
+            if (searchNode.getLength() > 1) {
+                // compare the byte seq
+                int c = 0;
+                int oldByteIndex = byteIndex;
+
+                IValueReference storedFieldName = fieldNames.get(searchNode.getIndex());
+                byte[] storedFieldBytes = storedFieldName.getByteArray();
+                int storedFieldStart = searchNode.getStart();
+                while (c < Math.min(searchNode.getLength(), lastIndex - oldByteIndex + 1)) {
+                    if (bytes[byteIndex] != storedFieldBytes[storedFieldStart + c]) {
+                        break;
+                    }
+                    c++;
+                    byteIndex++;
+                }
+
+                // from c & byteIndex, things are not matching,
+                // split into two nodes,
+                // one from (c, ...) -> handled below
+                // other from (byteIndex, ...) -> handled in the next iteration, as byteIndex will be absent.
+
+                // handling (c, ..)
+                int leftToSplitForCurrentNode = searchNode.getLength() - c;
+                if (leftToSplitForCurrentNode > 0) {
+                    searchNode.split(storedFieldName, c);
+                }
+            } else {
+                byteIndex++;
+            }
+        }
+
+        // since the node is already present,
+        // point it to the current fieldName, and update the start and length based on the fieldName
+        // prefix would be the same
+        // find absolute starting point in the current fieldName
+        int diff = searchNode.getStart() - searchNode.getBytesToStoreLength();
+        // since hookup happens on a new fieldName, hence start will be bytesToStoreLength
+        searchNode.setIndex(fieldNames.size(), start + diff, searchNode.getLength(), bytesToStoreLength);
+        searchNode.setIsEndOfField(true);
+        fieldNames.add(fieldName);
+        return searchNode.getIndex();
+    }
+
+    public void serialize(DataOutput out) throws IOException {
+        out.writeInt(VERSION);
+
+        // serialize fieldNames
+        out.writeInt(fieldNames.size());
+        for (IValueReference fieldName : fieldNames) {
+            out.writeInt(fieldName.getLength());
+            out.write(fieldName.getByteArray(), fieldName.getStartOffset(), fieldName.getLength());
+        }
+
+        rootNode.serialize(out);
+    }
+
+    public static FieldNameTrie deserialize(DataInput in) throws IOException {
+        int version = in.readInt();
+        switch (version) {
+            case VERSION:
+                return deserializeV1(in);
+            default:
+                throw new IllegalStateException("Unsupported version: " + version);
+        }
+    }
+
+    private static FieldNameTrie deserializeV1(DataInput in) throws IOException {
+        int numberOfFieldNames = in.readInt();
+
+        List<IValueReference> fieldNames = new ArrayList<>();
+        deserializeFieldNames(in, fieldNames, numberOfFieldNames);
+
+        FieldNameTrie newTrie = new FieldNameTrie(fieldNames);
+        newTrie.rootNode = TrieNode.deserialize(in);
+
+        return newTrie;
+    }
+
+    private static void deserializeFieldNames(DataInput input, List<IValueReference> fieldNames, int numberOfFieldNames)
+            throws IOException {
+        for (int i = 0; i < numberOfFieldNames; i++) {
+            int length = input.readInt();
+            ArrayBackedValueStorage fieldName = new ArrayBackedValueStorage(length);
+            fieldName.setSize(length);
+            input.readFully(fieldName.getByteArray(), 0, length);
+            fieldNames.add(fieldName);
+        }
+    }
+
+    public List<IValueReference> getFieldNames() {
+        return fieldNames;
+    }
+
+    public IValueReference getFieldName(int fieldIndex) {
+        return fieldNames.get(fieldIndex);
+    }
+
+    public void clear() {
+        rootNode = null;
+        fieldNames.clear();
+    }
+
+    @Override
+    public String toString() {
+        TrieNode currentNode = rootNode;
+        Queue<TrieNode> queue = new LinkedList<>();
+        for (TrieNode node : currentNode.getChildren().values()) {
+            queue.offer(node);
+        }
+        StringBuilder treeBuilder = new StringBuilder();
+        while (!queue.isEmpty()) {
+            int len = queue.size();
+            for (int i = 0; i < len; i++) {
+                TrieNode node = queue.poll();
+                assert node != null;
+                byte[] bytes = fieldNames.get(node.getIndex()).getByteArray();
+                for (int j = 0; j < node.getLength(); j++) {
+                    treeBuilder.append((char) bytes[node.getStart() + j]);
+                }
+                treeBuilder.append("(").append(node.isEndOfField()).append(")");
+                if (i != len - 1) {
+                    treeBuilder.append(" | ");
+                }
+
+                for (TrieNode child : node.getChildren().values()) {
+                    queue.offer(child);
+                }
+            }
+            treeBuilder.append("\n");
+        }
+        return treeBuilder.toString();
+    }
+
+    class LookupState {
+        private TrieNode lastNode;
+        private int relativeOffsetFromStart;
+        private int fieldLength;
+
+        public void setState(TrieNode lastNode, int startIndex, int continuationByteIndex, int fieldLength) {
+            this.lastNode = lastNode;
+            this.relativeOffsetFromStart = continuationByteIndex - startIndex;
+            this.fieldLength = fieldLength;
+        }
+
+        public TrieNode getLastNode() {
+            return lastNode;
+        }
+
+        public int getRelativeOffsetFromStart() {
+            return relativeOffsetFromStart;
+        }
+
+        public int getFieldLength() {
+            return fieldLength;
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesDictionary.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesHashDictionary.java
similarity index 78%
rename from asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesDictionary.java
rename to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesHashDictionary.java
index 9ac226b..c83b289 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesDictionary.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesHashDictionary.java
@@ -25,70 +25,50 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
-import org.apache.asterix.om.base.AMutableString;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.util.string.UTF8StringReader;
-import org.apache.hyracks.util.string.UTF8StringWriter;
 
 import it.unimi.dsi.fastutil.ints.Int2IntMap;
 import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
 import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 
-public class FieldNamesDictionary {
-    /**
-     * Dummy field name used to add a column when encountering empty object
-     */
-    public static final IValueReference DUMMY_FIELD_NAME;
-    public static final int DUMMY_FIELD_NAME_INDEX = -1;
+public class FieldNamesHashDictionary extends AbstractFieldNamesDictionary {
     //For both declared and inferred fields
     private final List<IValueReference> fieldNames;
     private final Object2IntMap<String> declaredFieldNamesToIndexMap;
     private final Int2IntMap hashToFieldNameIndexMap;
     private final IBinaryHashFunction fieldNameHashFunction;
 
-    //For declared fields
-    private final AMutableString mutableString;
-    private final AStringSerializerDeserializer stringSerDer;
-
     //For lookups
     private final ArrayBackedValueStorage lookupStorage;
 
-    static {
-        VoidPointable dummy = new VoidPointable();
-        dummy.set(new byte[0], 0, 0);
-        DUMMY_FIELD_NAME = dummy;
-    }
-
-    public FieldNamesDictionary() {
+    public FieldNamesHashDictionary() {
         this(new ArrayList<>(), new Object2IntOpenHashMap<>(), new Int2IntOpenHashMap());
     }
 
-    private FieldNamesDictionary(List<IValueReference> fieldNames, Object2IntMap<String> declaredFieldNamesToIndexMap,
-            Int2IntMap hashToFieldNameIndexMap) {
+    private FieldNamesHashDictionary(List<IValueReference> fieldNames,
+            Object2IntMap<String> declaredFieldNamesToIndexMap, Int2IntMap hashToFieldNameIndexMap) {
+        super();
         this.fieldNames = fieldNames;
         this.declaredFieldNamesToIndexMap = declaredFieldNamesToIndexMap;
         this.hashToFieldNameIndexMap = hashToFieldNameIndexMap;
-
-        mutableString = new AMutableString("");
-        stringSerDer = new AStringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader());
         fieldNameHashFunction =
                 new PointableBinaryHashFunctionFactory(UTF8StringPointable.FACTORY).createBinaryHashFunction();
         lookupStorage = new ArrayBackedValueStorage();
     }
 
+    @Override
     public List<IValueReference> getFieldNames() {
         return fieldNames;
     }
 
     //TODO solve collision (they're so rare that I haven't seen any)
+    @Override
     public int getOrCreateFieldNameIndex(IValueReference fieldName) throws HyracksDataException {
         if (fieldName == DUMMY_FIELD_NAME) {
             return DUMMY_FIELD_NAME_INDEX;
@@ -103,6 +83,7 @@
         return hashToFieldNameIndexMap.get(hash);
     }
 
+    @Override
     public int getOrCreateFieldNameIndex(String fieldName) throws HyracksDataException {
         if (!declaredFieldNamesToIndexMap.containsKey(fieldName)) {
             IValueReference serializedFieldName = creatFieldName(fieldName);
@@ -114,29 +95,13 @@
         return declaredFieldNamesToIndexMap.getInt(fieldName);
     }
 
+    @Override
     public int getFieldNameIndex(String fieldName) throws HyracksDataException {
         lookupStorage.reset();
         serializeFieldName(fieldName, lookupStorage);
         return hashToFieldNameIndexMap.getOrDefault(getHash(lookupStorage), -1);
     }
 
-    private ArrayBackedValueStorage creatFieldName(IValueReference fieldName) throws HyracksDataException {
-        ArrayBackedValueStorage copy = new ArrayBackedValueStorage(fieldName.getLength());
-        copy.append(fieldName);
-        return copy;
-    }
-
-    private ArrayBackedValueStorage creatFieldName(String fieldName) throws HyracksDataException {
-        ArrayBackedValueStorage serializedFieldName = new ArrayBackedValueStorage();
-        serializeFieldName(fieldName, serializedFieldName);
-        return serializedFieldName;
-    }
-
-    private void serializeFieldName(String fieldName, ArrayBackedValueStorage storage) throws HyracksDataException {
-        mutableString.setValue(fieldName);
-        stringSerDer.serialize(mutableString, storage.getDataOutput());
-    }
-
     private int getHash(IValueReference fieldName) throws HyracksDataException {
         byte[] object = fieldName.getByteArray();
         int start = fieldName.getStartOffset();
@@ -152,6 +117,7 @@
         return index;
     }
 
+    @Override
     public IValueReference getFieldName(int index) {
         if (index == DUMMY_FIELD_NAME_INDEX) {
             return DUMMY_FIELD_NAME;
@@ -159,6 +125,7 @@
         return fieldNames.get(index);
     }
 
+    @Override
     public void serialize(DataOutput output) throws IOException {
         output.writeInt(fieldNames.size());
         for (IValueReference fieldName : fieldNames) {
@@ -178,7 +145,7 @@
         }
     }
 
-    public static FieldNamesDictionary deserialize(DataInput input) throws IOException {
+    public static FieldNamesHashDictionary deserialize(DataInput input) throws IOException {
         int numberOfFieldNames = input.readInt();
 
         List<IValueReference> fieldNames = new ArrayList<>();
@@ -190,9 +157,10 @@
         Int2IntMap hashToFieldNameIndexMap = new Int2IntOpenHashMap();
         deserializeHashToFieldNameIndex(input, hashToFieldNameIndexMap, numberOfFieldNames);
 
-        return new FieldNamesDictionary(fieldNames, declaredFieldNamesToIndexMap, hashToFieldNameIndexMap);
+        return new FieldNamesHashDictionary(fieldNames, declaredFieldNamesToIndexMap, hashToFieldNameIndexMap);
     }
 
+    @Override
     public void abort(DataInputStream input) throws IOException {
         int numberOfFieldNames = input.readInt();
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesTrieDictionary.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesTrieDictionary.java
new file mode 100644
index 0000000..8b2d548
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesTrieDictionary.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class FieldNamesTrieDictionary extends AbstractFieldNamesDictionary {
+    private FieldNameTrie dictionary;
+    //For lookups
+    private final ArrayBackedValueStorage lookupStorage;
+
+    public FieldNamesTrieDictionary() {
+        this(new FieldNameTrie());
+    }
+
+    public FieldNamesTrieDictionary(FieldNameTrie dictionary) {
+        super();
+        this.dictionary = dictionary;
+        lookupStorage = new ArrayBackedValueStorage();
+    }
+
+    @Override
+    public List<IValueReference> getFieldNames() {
+        return dictionary.getFieldNames();
+    }
+
+    @Override
+    public int getOrCreateFieldNameIndex(IValueReference fieldName) throws HyracksDataException {
+        if (fieldName == DUMMY_FIELD_NAME) {
+            return DUMMY_FIELD_NAME_INDEX;
+        }
+
+        return dictionary.insert(fieldName);
+    }
+
+    @Override
+    public int getOrCreateFieldNameIndex(String fieldName) throws HyracksDataException {
+        return getOrCreateFieldNameIndex(creatFieldName(fieldName));
+    }
+
+    @Override
+    public int getFieldNameIndex(String fieldName) throws HyracksDataException {
+        lookupStorage.reset();
+        serializeFieldName(fieldName, lookupStorage);
+        return dictionary.lookup(lookupStorage);
+    }
+
+    @Override
+    public IValueReference getFieldName(int index) {
+        if (index == DUMMY_FIELD_NAME_INDEX) {
+            return DUMMY_FIELD_NAME;
+        }
+        return dictionary.getFieldName(index);
+    }
+
+    @Override
+    public void serialize(DataOutput output) throws IOException {
+        dictionary.serialize(output);
+    }
+
+    public static FieldNamesTrieDictionary deserialize(DataInput input) throws IOException {
+        FieldNameTrie fieldNameTrie = FieldNameTrie.deserialize(input);
+        return new FieldNamesTrieDictionary(fieldNameTrie);
+    }
+
+    @Override
+    public void abort(DataInputStream input) throws IOException {
+        dictionary.clear();
+        dictionary = FieldNameTrie.deserialize(input);
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/IFieldNamesDictionary.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/IFieldNamesDictionary.java
new file mode 100644
index 0000000..8aa0e88
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/IFieldNamesDictionary.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata;
+
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+/**
+ * methods for defining the fieldName dictionary
+ * which is used to encode a fieldName to an index.
+ */
+public interface IFieldNamesDictionary {
+    /**
+     * @return get all the inserted field names
+     */
+    List<IValueReference> getFieldNames();
+
+    /**
+     * @param fieldName fieldName byte array
+     * @return returns index if field exist, otherwise insert fieldName and return the new index
+     * @throws HyracksDataException
+     */
+    int getOrCreateFieldNameIndex(IValueReference fieldName) throws HyracksDataException;
+
+    /**
+     * @param fieldName fieldName string
+     * @return returns index if field exist, otherwise insert fieldName and return the new index
+     * @throws HyracksDataException
+     */
+    int getOrCreateFieldNameIndex(String fieldName) throws HyracksDataException;
+
+    /**
+     * @param fieldName
+     * @return index of the field if exists otherwise -1
+     * @throws HyracksDataException
+     */
+    int getFieldNameIndex(String fieldName) throws HyracksDataException;
+
+    /**
+     * @param index encoded index
+     * @return the fieldName present at the requested index
+     */
+    IValueReference getFieldName(int index);
+
+    /**
+     * serialize the dictionary
+     * @param output
+     * @throws IOException
+     */
+    void serialize(DataOutput output) throws IOException;
+
+    /**
+     * resetting and rebuilding the dictionary
+     * @param input
+     * @throws IOException
+     */
+    void abort(DataInputStream input) throws IOException;
+
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/TrieNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/TrieNode.java
new file mode 100644
index 0000000..18e645b
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/TrieNode.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.column.metadata;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hyracks.data.std.api.IValueReference;
+
+import it.unimi.dsi.fastutil.bytes.Byte2ObjectArrayMap;
+import it.unimi.dsi.fastutil.bytes.Byte2ObjectMap;
+
+class TrieNode {
+    public static final int NOT_FOUND_INDEX = -1;
+
+    private Byte2ObjectMap<TrieNode> children;
+    private boolean isEndOfField;
+    private int index;
+    private int start; // includes the edges' byte
+    private int length; // includes the edges' byte
+    private int bytesToStoreLength;
+
+    TrieNode() {
+        children = new Byte2ObjectArrayMap<>();
+        index = NOT_FOUND_INDEX;
+    }
+
+    TrieNode(Byte2ObjectMap<TrieNode> children) {
+        this.children = children;
+        index = NOT_FOUND_INDEX;
+    }
+
+    public void setIndex(int index, int start, int length, int bytesToStoreLength) {
+        this.index = index;
+        this.start = start;
+        this.length = length;
+        this.bytesToStoreLength = bytesToStoreLength;
+    }
+
+    public void setIsEndOfField(boolean isEndOfField) {
+        this.isEndOfField = isEndOfField;
+    }
+
+    public boolean containsKey(byte key) {
+        return children.containsKey(key);
+    }
+
+    public TrieNode getChild(byte key) {
+        return children.get(key);
+    }
+
+    public void putChild(byte key, TrieNode child) {
+        children.put(key, child);
+    }
+
+    public Byte2ObjectMap<TrieNode> getChildren() {
+        return children;
+    }
+
+    public int getIndex() {
+        return index;
+    }
+
+    public int getStart() {
+        return start;
+    }
+
+    public int getLength() {
+        return length;
+    }
+
+    public int getBytesToStoreLength() {
+        return bytesToStoreLength;
+    }
+
+    public boolean isEndOfField() {
+        return isEndOfField;
+    }
+
+    public void reset() {
+        // since this object went to the new node.
+        children = new Byte2ObjectArrayMap<>();
+    }
+
+    public void split(IValueReference fieldName, int splitIndex) {
+        byte[] storedFieldBytes = fieldName.getByteArray();
+        byte splitByte = storedFieldBytes[start + splitIndex];
+        // something to be split, have to create a new node
+        // and do the linking.
+        TrieNode childNode = new TrieNode(children);
+        int leftToSplit = length - splitIndex;
+        childNode.setIndex(index, start + splitIndex, leftToSplit, bytesToStoreLength);
+        childNode.setIsEndOfField(isEndOfField);
+        // update the current search node
+        // new length would be 'c'
+        reset();
+        setIndex(index, start, splitIndex, bytesToStoreLength);
+        putChild(splitByte, childNode);
+        // since there was a split in searchNode, hence isEndOfField will be false.
+        setIsEndOfField(false);
+    }
+
+    public void serialize(DataOutput out) throws IOException {
+        // serialize fields
+        out.writeBoolean(isEndOfField);
+        out.writeInt(index);
+        out.writeInt(start);
+        out.writeInt(length);
+        out.writeInt(bytesToStoreLength);
+
+        out.writeInt(children.size());
+        for (Map.Entry<Byte, TrieNode> entry : children.byte2ObjectEntrySet()) {
+            out.writeByte(entry.getKey());
+            entry.getValue().serialize(out);
+        }
+    }
+
+    public static TrieNode deserialize(DataInput in) throws IOException {
+        TrieNode node = new TrieNode();
+        node.isEndOfField = in.readBoolean();
+        node.index = in.readInt();
+        node.start = in.readInt();
+        node.length = in.readInt();
+        node.bytesToStoreLength = in.readInt();
+
+        int childrenSize = in.readInt();
+        for (int i = 0; i < childrenSize; i++) {
+            byte b = in.readByte();
+            node.children.put(b, TrieNode.deserialize(in));
+        }
+        return node;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
index 451ece5..6014bf6 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.metadata.schema;
 
+import static org.apache.asterix.column.metadata.AbstractFieldNamesDictionary.DUMMY_FIELD_NAME_INDEX;
+
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
@@ -27,7 +29,6 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.asterix.column.metadata.FieldNamesDictionary;
 import org.apache.asterix.column.metadata.PathInfoSerializer;
 import org.apache.asterix.column.metadata.schema.primitive.MissingFieldSchemaNode;
 import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
@@ -64,7 +65,7 @@
 
         fieldNameIndexToChildIndexMap = new Int2IntOpenHashMap();
         deserializeFieldNameIndexToChildIndex(input, fieldNameIndexToChildIndexMap, numberOfChildren);
-        if (fieldNameIndexToChildIndexMap.containsKey(FieldNamesDictionary.DUMMY_FIELD_NAME_INDEX)) {
+        if (fieldNameIndexToChildIndexMap.containsKey(DUMMY_FIELD_NAME_INDEX)) {
             nextIndex = this::emptyColumnIndex;
         } else {
             nextIndex = this::nextIndex;
@@ -102,7 +103,7 @@
             return;
         }
         AbstractSchemaNode emptyChild = columnMetadata.getOrCreateChild(null, ATypeTag.MISSING);
-        addChild(FieldNamesDictionary.DUMMY_FIELD_NAME_INDEX, emptyChild);
+        addChild(DUMMY_FIELD_NAME_INDEX, emptyChild);
         nextIndex = this::emptyColumnIndex;
     }
 
@@ -196,7 +197,7 @@
 
     private int emptyColumnIndex(int fieldNameIndex) {
         nextIndex = this::nextIndex;
-        fieldNameIndexToChildIndexMap.remove(FieldNamesDictionary.DUMMY_FIELD_NAME_INDEX);
+        fieldNameIndexToChildIndexMap.remove(DUMMY_FIELD_NAME_INDEX);
         fieldNameIndexToChildIndexMap.put(fieldNameIndex, 0);
         return 0;
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java
index e6a85b8..05c4eda 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java
@@ -22,7 +22,7 @@
 
 import java.util.List;
 
-import org.apache.asterix.column.metadata.FieldNamesDictionary;
+import org.apache.asterix.column.metadata.IFieldNamesDictionary;
 import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
 import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
 import org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
@@ -133,7 +133,7 @@
             throws HyracksDataException {
         IAType[] fieldTypes = recordType.getFieldTypes();
         String[] fieldNames = recordType.getFieldNames();
-        FieldNamesDictionary dictionary = columnMetadata.getFieldNamesDictionary();
+        IFieldNamesDictionary dictionary = columnMetadata.getFieldNamesDictionary();
 
         if (isProcessingPrimaryKeys() && !fieldNames[fieldIndex].equals(currentPrimaryKeyPath.get(currentPathIndex))) {
             // Still processing PKs, do not add any fields to the children until all PKs are processed
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaClipperVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaClipperVisitor.java
index 7164304..afe8368 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaClipperVisitor.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaClipperVisitor.java
@@ -21,7 +21,7 @@
 import java.io.IOException;
 import java.util.Map;
 
-import org.apache.asterix.column.metadata.FieldNamesDictionary;
+import org.apache.asterix.column.metadata.IFieldNamesDictionary;
 import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
 import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
 import org.apache.asterix.column.metadata.schema.UnionSchemaNode;
@@ -40,12 +40,12 @@
 import org.apache.hyracks.api.exceptions.Warning;
 
 public class SchemaClipperVisitor implements IATypeVisitor<AbstractSchemaNode, AbstractSchemaNode> {
-    private final FieldNamesDictionary fieldNamesDictionary;
+    private final IFieldNamesDictionary fieldNamesDictionary;
     private final IWarningCollector warningCollector;
     private final Map<String, FunctionCallInformation> functionCallInfoMap;
     private boolean ignoreFlatType;
 
-    public SchemaClipperVisitor(FieldNamesDictionary fieldNamesDictionary,
+    public SchemaClipperVisitor(IFieldNamesDictionary fieldNamesDictionary,
             Map<String, FunctionCallInformation> functionCallInfoMap, IWarningCollector warningCollector) {
         this.fieldNamesDictionary = fieldNamesDictionary;
         this.functionCallInfoMap = functionCallInfoMap;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
index 2cee533..04334a3 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
@@ -31,7 +31,8 @@
 import java.util.Map;
 
 import org.apache.asterix.column.metadata.AbstractColumnMetadata;
-import org.apache.asterix.column.metadata.FieldNamesDictionary;
+import org.apache.asterix.column.metadata.FieldNamesTrieDictionary;
+import org.apache.asterix.column.metadata.IFieldNamesDictionary;
 import org.apache.asterix.column.metadata.PathInfoSerializer;
 import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
 import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
@@ -71,7 +72,7 @@
     private static final Logger LOGGER = LogManager.getLogger();
     private final Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels;
     private final Mutable<IColumnWriteMultiPageOp> multiPageOpRef;
-    private final FieldNamesDictionary fieldNamesDictionary;
+    private final IFieldNamesDictionary fieldNamesDictionary;
     private final ObjectSchemaNode root;
     private final ObjectSchemaNode metaRoot;
     private final IColumnValuesWriterFactory columnWriterFactory;
@@ -94,7 +95,7 @@
         columnWriters = new ArrayList<>();
         level = -1;
         repeated = 0;
-        fieldNamesDictionary = new FieldNamesDictionary();
+        fieldNamesDictionary = new FieldNamesTrieDictionary();
         root = new ObjectSchemaNode();
         metaRoot = metaType != null ? new ObjectSchemaNode() : null;
         pathInfoSerializer = new PathInfoSerializer();
@@ -124,7 +125,7 @@
     private FlushColumnMetadata(ARecordType datasetType, ARecordType metaType, List<List<String>> primaryKeys,
             boolean metaContainsKeys, IColumnValuesWriterFactory columnWriterFactory,
             Mutable<IColumnWriteMultiPageOp> multiPageOpRef, List<IColumnValuesWriter> columnWriters,
-            FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, ObjectSchemaNode metaRoot,
+            IFieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, ObjectSchemaNode metaRoot,
             Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels,
             ArrayBackedValueStorage serializedMetadata) {
         super(datasetType, metaType, primaryKeys.size());
@@ -146,7 +147,7 @@
         changed = false;
     }
 
-    public FieldNamesDictionary getFieldNamesDictionary() {
+    public IFieldNamesDictionary getFieldNamesDictionary() {
         return fieldNamesDictionary;
     }
 
@@ -249,7 +250,7 @@
         deserializeWriters(input, writers, columnWriterFactory);
 
         //FieldNames
-        FieldNamesDictionary fieldNamesDictionary = FieldNamesDictionary.deserialize(input);
+        IFieldNamesDictionary fieldNamesDictionary = FieldNamesTrieDictionary.deserialize(input);
 
         //Schema
         Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels = new HashMap<>();
@@ -574,7 +575,7 @@
     }
 
     private static void logSchema(ObjectSchemaNode root, ObjectSchemaNode metaRoot,
-            FieldNamesDictionary fieldNamesDictionary) throws HyracksDataException {
+            IFieldNamesDictionary fieldNamesDictionary) throws HyracksDataException {
         if (!LOGGER.isDebugEnabled()) {
             return;
         }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
index a697e0d..967369f 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
@@ -39,7 +39,8 @@
 import org.apache.asterix.column.filter.range.IColumnRangeFilterEvaluatorFactory;
 import org.apache.asterix.column.filter.range.IColumnRangeFilterValueAccessor;
 import org.apache.asterix.column.metadata.AbstractColumnImmutableReadMetadata;
-import org.apache.asterix.column.metadata.FieldNamesDictionary;
+import org.apache.asterix.column.metadata.FieldNamesTrieDictionary;
+import org.apache.asterix.column.metadata.IFieldNamesDictionary;
 import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
 import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
 import org.apache.asterix.column.metadata.schema.visitor.SchemaClipperVisitor;
@@ -65,7 +66,7 @@
  */
 public class QueryColumnMetadata extends AbstractColumnImmutableReadMetadata {
     private static final Logger LOGGER = LogManager.getLogger();
-    private final FieldNamesDictionary fieldNamesDictionary;
+    private final IFieldNamesDictionary fieldNamesDictionary;
     private final PrimitiveColumnValuesReader[] primaryKeyReaders;
     private final IColumnFilterEvaluator normalizedFilterEvaluator;
     private final List<IColumnRangeFilterValueAccessor> filterValueAccessors;
@@ -76,7 +77,7 @@
 
     protected QueryColumnMetadata(ARecordType datasetType, ARecordType metaType,
             PrimitiveColumnValuesReader[] primaryKeyReaders, IValueReference serializedMetadata,
-            FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, IColumnValuesReaderFactory readerFactory,
+            IFieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, IColumnValuesReaderFactory readerFactory,
             IValueGetterFactory valueGetterFactory, IColumnFilterEvaluator normalizedFilterEvaluator,
             List<IColumnRangeFilterValueAccessor> filterValueAccessors,
             IColumnIterableFilterEvaluator columnFilterEvaluator, List<IColumnValuesReader> filterColumnReaders,
@@ -96,7 +97,7 @@
         return assembler;
     }
 
-    public final FieldNamesDictionary getFieldNamesDictionary() {
+    public final IFieldNamesDictionary getFieldNamesDictionary() {
         return fieldNamesDictionary;
     }
 
@@ -188,7 +189,7 @@
         DataInput input = new DataInputStream(new ByteArrayInputStream(bytes, fieldNamesStart, length));
 
         //FieldNames
-        FieldNamesDictionary fieldNamesDictionary = FieldNamesDictionary.deserialize(input);
+        IFieldNamesDictionary fieldNamesDictionary = FieldNamesTrieDictionary.deserialize(input);
 
         //Schema
         ObjectSchemaNode root = (ObjectSchemaNode) AbstractSchemaNode.deserialize(input, null);
@@ -268,7 +269,7 @@
     }
 
     protected static void logSchema(String jobId, ObjectSchemaNode root, String schemaSource,
-            FieldNamesDictionary fieldNamesDictionary) throws HyracksDataException {
+            IFieldNamesDictionary fieldNamesDictionary) throws HyracksDataException {
         if (jobId != null && LOGGER.isDebugEnabled()) {
             SchemaStringBuilderVisitor schemaBuilder = new SchemaStringBuilderVisitor(fieldNamesDictionary);
             String schema = LogRedactionUtil.userData(schemaBuilder.build(root));
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
index 8b23c05..cda492c 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
@@ -35,7 +35,8 @@
 import org.apache.asterix.column.filter.iterable.IColumnIterableFilterEvaluatorFactory;
 import org.apache.asterix.column.filter.range.IColumnRangeFilterEvaluatorFactory;
 import org.apache.asterix.column.filter.range.IColumnRangeFilterValueAccessor;
-import org.apache.asterix.column.metadata.FieldNamesDictionary;
+import org.apache.asterix.column.metadata.FieldNamesTrieDictionary;
+import org.apache.asterix.column.metadata.IFieldNamesDictionary;
 import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
 import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
 import org.apache.asterix.column.metadata.schema.visitor.SchemaClipperVisitor;
@@ -61,7 +62,7 @@
 
     private QueryColumnWithMetaMetadata(ARecordType datasetType, ARecordType metaType,
             PrimitiveColumnValuesReader[] primaryKeyReaders, IValueReference serializedMetadata,
-            FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, ObjectSchemaNode metaRoot,
+            IFieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, ObjectSchemaNode metaRoot,
             IColumnValuesReaderFactory readerFactory, IValueGetterFactory valueGetterFactory,
             IColumnFilterEvaluator filterEvaluator, List<IColumnRangeFilterValueAccessor> filterValueAccessors,
             IColumnIterableFilterEvaluator columnFilterEvaluator, List<IColumnValuesReader> filterColumnReaders,
@@ -132,7 +133,7 @@
         DataInput input = new DataInputStream(new ByteArrayInputStream(bytes, fieldNamesStart, length));
 
         //FieldNames
-        FieldNamesDictionary fieldNamesDictionary = FieldNamesDictionary.deserialize(input);
+        IFieldNamesDictionary fieldNamesDictionary = FieldNamesTrieDictionary.deserialize(input);
 
         //Schema
         ObjectSchemaNode root = (ObjectSchemaNode) AbstractSchemaNode.deserialize(input, null);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/SchemaStringBuilderVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/SchemaStringBuilderVisitor.java
index 7134b2f..571ab83 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/SchemaStringBuilderVisitor.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/SchemaStringBuilderVisitor.java
@@ -21,7 +21,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.asterix.column.metadata.FieldNamesDictionary;
+import org.apache.asterix.column.metadata.IFieldNamesDictionary;
 import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
 import org.apache.asterix.column.metadata.schema.ISchemaNodeVisitor;
 import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
@@ -48,7 +48,7 @@
     private int level;
     private int indent;
 
-    public SchemaStringBuilderVisitor(FieldNamesDictionary dictionary) throws HyracksDataException {
+    public SchemaStringBuilderVisitor(IFieldNamesDictionary dictionary) throws HyracksDataException {
         builder = new StringBuilder();
         this.fieldNames = new ArrayList<>();
         AStringSerializerDeserializer stringSerDer =
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/metadata/trie/FieldNameTrieTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/metadata/trie/FieldNameTrieTest.java
new file mode 100644
index 0000000..d996963
--- /dev/null
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/metadata/trie/FieldNameTrieTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.column.metadata.trie;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.asterix.column.metadata.FieldNameTrie;
+import org.apache.asterix.column.metadata.FieldNamesTrieDictionary;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.data.std.util.ByteArrayAccessibleInputStream;
+import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import org.apache.hyracks.data.std.util.RewindableDataOutputStream;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FieldNameTrieTest {
+
+    private static final int NOT_FOUND_INDEX = -1;
+    private FieldNameTrie trie;
+    private FieldNamesTrieDictionary dictionary;
+
+    @Before
+    public void init() {
+        trie = new FieldNameTrie();
+        dictionary = new FieldNamesTrieDictionary();
+    }
+
+    @After
+    public void cleanup() {
+        if (trie != null) {
+            trie.clear();
+        }
+    }
+
+    @Test
+    public void asciiTest() throws HyracksDataException {
+        List<String> fieldNames =
+                List.of("ant", "age", "agent", "agitation", "agitated", "name", "aghast", "anti", "anon");
+
+        assertInsertAndLookup(fieldNames);
+    }
+
+    @Test
+    public void insertTest1() throws HyracksDataException {
+        List<String> fieldNames = List.of("ap", "app", "apple");
+        assertInsertAndLookup(fieldNames);
+    }
+
+    @Test
+    public void insertTest2() throws HyracksDataException {
+        List<String> fieldNames = List.of("ap", "apple", "app");
+        assertInsertAndLookup(fieldNames);
+    }
+
+    @Test
+    public void insertTest3() throws HyracksDataException {
+        List<String> fieldNames = List.of("app", "apple", "ap");
+        assertInsertAndLookup(fieldNames);
+    }
+
+    @Test
+    public void insertTest4() throws HyracksDataException {
+        List<String> fieldNames = List.of("app", "ap", "apple");
+        assertInsertAndLookup(fieldNames);
+    }
+
+    @Test
+    public void insertTest5() throws HyracksDataException {
+        List<String> fieldNames = List.of("apple", "app", "ap");
+        assertInsertAndLookup(fieldNames);
+    }
+
+    @Test
+    public void insertTest6() throws HyracksDataException {
+        List<String> fieldNames = List.of("apple", "ap", "app");
+        assertInsertAndLookup(fieldNames);
+    }
+
+    @Test
+    public void unicodeTest() throws HyracksDataException {
+        List<String> fieldNames = List.of(
+                // Basic Insert and Search
+                "apple", "app", "application", "苹果", "应用", "सेब", "आवेदन", "りんご", "アプリケーション", "تفاحة", "تطبيق",
+
+                // Insert and Search with Emojis
+                "😀", "😃", "😄", "😅", "😂", "🍎", "🍏", "🍐", "🍊", "🍋",
+
+                // Insert and Search with Accented Characters
+                "éléphant", "école", "étudiant", "über", "groß", "straße",
+
+                // Edge Cases
+                "", "a", "字", "م",
+
+                // Mixed Scripts
+                "apple苹果", "app应用", "applicationआवेदन", "こんにちはworld", "مرحباworld",
+
+                // Special Symbols and Punctuation
+                "hello!", "hello?", "hello.", "hello,", "!@#$%^&*()_+-=[]{}|;':,.<>/?",
+
+                // Long Strings
+                "a".repeat(1000), "あ".repeat(500),
+
+                // Palindromes
+                "madam", "racecar", "あいいあ",
+
+                // Anagrams
+                "listen", "silent", "inlets", "学校", "校学");
+
+        assertInsertAndLookup(fieldNames);
+    }
+
+    @Test
+    public void insertingDuplicates() throws HyracksDataException {
+        List<String> fieldNames =
+                List.of("ant", "age", "agent", "agitation", "agitated", "name", "aghast", "anti", "anon");
+
+        for (String fieldName : fieldNames) {
+            trie.insert(UTF8StringPointable.generateUTF8Pointable(fieldName));
+            dictionary.getOrCreateFieldNameIndex(UTF8StringPointable.generateUTF8Pointable(fieldName));
+        }
+
+        assertInsertAndLookup(fieldNames);
+    }
+
+    @Test
+    public void insertingRandomTest() throws IOException {
+        String fileName = "dict-words.txt";
+        try {
+            UnicodeFieldNameGenerator.genFields(1000, 10, 200, 100, 0.9F, fileName);
+
+            // reading the fields and storing in the array.
+            List<IValueReference> fields = new ArrayList<>();
+            try (BufferedReader reader = new BufferedReader(new FileReader(fileName))) {
+                String fieldName;
+                while ((fieldName = reader.readLine()) != null) {
+                    fields.add(UTF8StringPointable.generateUTF8Pointable(fieldName));
+                }
+            }
+
+            for (IValueReference fieldName : fields) {
+                trie.insert(fieldName);
+            }
+
+            for (IValueReference fieldName : fields) {
+                Assert.assertNotEquals(NOT_FOUND_INDEX, trie.lookup(fieldName));
+            }
+
+            // check serde of unicode
+            ByteArrayAccessibleOutputStream baaos;
+            RewindableDataOutputStream dos;
+            baaos = new ByteArrayAccessibleOutputStream();
+            dos = new RewindableDataOutputStream(baaos);
+            trie.serialize(dos);
+
+            DataInputStream inputStream =
+                    new DataInputStream(new ByteArrayAccessibleInputStream(baaos.getByteArray(), 0, baaos.size()));
+            FieldNameTrie newDictionary = FieldNameTrie.deserialize(inputStream);
+
+            // looking up on the de-serialized dictionary
+            for (IValueReference fieldName : fields) {
+                Assert.assertNotEquals(NOT_FOUND_INDEX, newDictionary.lookup(fieldName));
+            }
+        } finally {
+            // deleting the file
+            File dictFile = new File(fileName);
+            dictFile.delete();
+        }
+    }
+
+    @Test
+    public void serializeDeserializeTest() throws IOException {
+        List<String> fieldNames =
+                List.of("ant", "age", "agent", "agitation", "agitated", "name", "aghast", "anti", "anon");
+
+        assertInsertAndLookup(fieldNames);
+        int expectedIndex;
+
+        ByteArrayAccessibleOutputStream baaos;
+        RewindableDataOutputStream dos;
+        baaos = new ByteArrayAccessibleOutputStream();
+        dos = new RewindableDataOutputStream(baaos);
+        trie.serialize(dos);
+
+        DataInputStream inputStream =
+                new DataInputStream(new ByteArrayAccessibleInputStream(baaos.getByteArray(), 0, baaos.size()));
+        FieldNameTrie newDictionary = FieldNameTrie.deserialize(inputStream);
+
+        // looking up on the de-serialized dictionary
+        expectedIndex = 0;
+        for (String fieldName : fieldNames) {
+            Assert.assertEquals(expectedIndex,
+                    newDictionary.lookup(UTF8StringPointable.generateUTF8Pointable(fieldName)));
+            expectedIndex++;
+        }
+    }
+
+    @Test
+    public void nameAlreadyPartOfAnotherField() throws HyracksDataException {
+        String prefix = "level_";
+        List<String> fieldNames = new ArrayList<>();
+
+        for (int i = 24; i >= 0; i--) {
+            fieldNames.add(prefix + i);
+        }
+
+        assertInsertAndLookup(fieldNames);
+    }
+
+    @Test
+    public void chaining() throws HyracksDataException {
+        List<String> fieldNames = new ArrayList<>();
+
+        String word = "";
+        for (int i = 0; i < 500; i++) {
+            word += "a";
+            fieldNames.add(word);
+        }
+
+        assertInsertAndLookup(fieldNames);
+    }
+
+    @Test
+    public void split() throws HyracksDataException {
+        List<String> fieldNames = new ArrayList<>();
+
+        String word = "";
+        for (int i = 0; i < 500; i++) {
+            word += "a";
+            fieldNames.add(word);
+        }
+
+        Collections.reverse(fieldNames);
+        assertInsertAndLookup(fieldNames);
+    }
+
+    private void assertInsertAndLookup(List<String> fieldNames) throws HyracksDataException {
+        for (String fieldName : fieldNames) {
+            Assert.assertEquals(dictionary.getOrCreateFieldNameIndex(fieldName),
+                    trie.insert(UTF8StringPointable.generateUTF8Pointable(fieldName)));
+        }
+
+        int expectedIndex = 0;
+        for (String fieldName : fieldNames) {
+            IValueReference serializedFieldName = UTF8StringPointable.generateUTF8Pointable(fieldName);
+            Assert.assertEquals(expectedIndex, trie.lookup(serializedFieldName));
+            Assert.assertEquals(expectedIndex, dictionary.getFieldNameIndex(fieldName));
+            Assert.assertEquals(expectedIndex, dictionary.getOrCreateFieldNameIndex(serializedFieldName));
+            expectedIndex++;
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/metadata/trie/UnicodeFieldNameGenerator.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/metadata/trie/UnicodeFieldNameGenerator.java
new file mode 100644
index 0000000..a02ea02
--- /dev/null
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/metadata/trie/UnicodeFieldNameGenerator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.metadata.trie;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Random;
+
+public class UnicodeFieldNameGenerator {
+
+    // Unicode ranges for different scripts and symbols
+    private static final char[] LATIN_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
+    private static final char[] NUMBER_CHARS = "0123456789".toCharArray();
+    private static final char[] GREEK_CHARS = "αβγδεζηθικλμνξοπρστυφχψωΑΒΓΔΕΖΗΘΙΚΛΜΝΞΟΠΡΣΤΥΦΧΨΩ".toCharArray();
+    private static final char[] CYRILLIC_CHARS =
+            "абвгдеёжзийклмнопрстуфхцчшщъыьэюяАБВГДЕЁЖЗИЙКЛМНОПРСТУФХЦЧШЩЪЫЬЭЮЯ".toCharArray();
+    private static final char[] CHINESE_CHARS =
+            "的一是了我不人在他有这个上们来到时大地为子中你说生国年着就那和要她出也得里后自以会家可下而过天去能对小多然于心学好".toCharArray();
+    private static final char[] JAPANESE_CHARS =
+            "あいうえおかきくけこさしすせそたちつてとなにぬねのはひふへほまみむめもやゆよらりるれろわをんアイウエオカキクケコサシスセソタチツテトナニヌネノハヒフヘホマミムメモヤユヨラリルレロワヲン"
+                    .toCharArray();
+    private static final char[] EMOJIS = "😀😁😂🤣😃😄😅😆😉😊😋😎😍😘".toCharArray();
+
+    private static final char[][] UNICODE_RANGES =
+            { LATIN_CHARS, NUMBER_CHARS, GREEK_CHARS, CYRILLIC_CHARS, CHINESE_CHARS, JAPANESE_CHARS, EMOJIS };
+    private static final Random RANDOM = new Random();
+
+    public static void genFields(int numFields, int minLength, int maxLength, int biasMaxLength, float bias,
+            String fileName) {
+        try (BufferedWriter writer = new BufferedWriter(new FileWriter(fileName))) {
+            int biasedCount = (int) (numFields * bias);
+            int remainingCount = numFields - biasedCount;
+
+            for (int i = 0; i < biasedCount; i++) {
+                int length = RANDOM.nextInt(biasMaxLength - minLength + 1) + minLength;
+                String word = generateFieldName(length);
+                writer.write(word);
+                writer.newLine();
+            }
+
+            for (int i = 0; i < remainingCount; i++) {
+                int length = RANDOM.nextInt(maxLength - biasMaxLength + 1) + biasMaxLength;
+                String word = generateFieldName(length);
+                writer.write(word);
+                writer.newLine();
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public static String generateFieldName(int length) {
+        StringBuilder fieldName = new StringBuilder(length);
+        for (int i = 0; i < length; i++) {
+            char[] selectedRange = UNICODE_RANGES[RANDOM.nextInt(UNICODE_RANGES.length)];
+            char randomChar = selectedRange[RANDOM.nextInt(selectedRange.length)];
+            fieldName.append(randomChar);
+        }
+        return fieldName.toString();
+    }
+
+}