[ASTERIXDB-2516][COMP] Avoid writing field names & values when comparing records

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
ARecordVisitablePointable writes field names, field tags, and field values
first before giving access to the record information. This is not ideal for
comparison. A different record accessor is needed for comparison. Also, the
field names should be sorted which ARecordVisitablePointable does not provide.
- avoid this writing when a pointable to the name & value can be obtained
(especially when the field value already includes the tag)
- use UTF8Pointable cached values (string length, meta length) to compare
instead of using the string comparator which would recalculate these values
- refactored some common code.
- removed not used methods in ARecordPointable

Change-Id: I19ac95a91749b2983bf06f763e463521a97a261c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3280
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/CompareHashUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/CompareHashUtil.java
deleted file mode 100644
index a42e5aa..0000000
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/CompareHashUtil.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.dataflow.data.nontagged;
-
-import static org.apache.asterix.om.types.ATypeTag.SERIALIZED_MISSING_TYPE_TAG;
-import static org.apache.asterix.om.types.ATypeTag.VALUE_TYPE_MAPPING;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.PriorityQueue;
-
-import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
-import org.apache.asterix.om.pointables.base.IVisitablePointable;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.TypeTagUtil;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class CompareHashUtil {
-
-    private CompareHashUtil() {
-    }
-
-    public static Comparator<IVisitablePointable> createFieldNamesComp(IBinaryComparator stringComp) {
-        return new Comparator<IVisitablePointable>() {
-            @Override
-            public int compare(IVisitablePointable name1, IVisitablePointable name2) {
-                try {
-                    return stringComp.compare(name1.getByteArray(), name1.getStartOffset() + 1, name1.getLength() - 1,
-                            name2.getByteArray(), name2.getStartOffset() + 1, name2.getLength() - 1);
-                } catch (HyracksDataException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        };
-    }
-
-    public static int addToHeap(List<IVisitablePointable> recordFNames, List<IVisitablePointable> recordFValues,
-            PriorityQueue<IVisitablePointable> names) {
-        // do not add fields whose value is missing, they don't exist in reality
-        int length = recordFNames.size();
-        IVisitablePointable fieldValue;
-        int count = 0;
-        for (int i = 0; i < length; i++) {
-            fieldValue = recordFValues.get(i);
-            if (fieldValue.getByteArray()[fieldValue.getStartOffset()] != SERIALIZED_MISSING_TYPE_TAG) {
-                names.add(recordFNames.get(i));
-                count++;
-            }
-        }
-        return count;
-    }
-
-    public static int getIndex(List<IVisitablePointable> names, IVisitablePointable instance) {
-        int size = names.size();
-        for (int i = 0; i < size; i++) {
-            if (instance == names.get(i)) {
-                return i;
-            }
-        }
-        throw new IllegalStateException();
-    }
-
-    public static IAType getType(ARecordType recordType, int fieldIdx, ATypeTag fieldTag) throws HyracksDataException {
-        IAType[] fieldTypes = recordType.getFieldTypes();
-        if (fieldIdx >= fieldTypes.length) {
-            return fieldTag.isDerivedType() ? DefaultOpenFieldType.getDefaultOpenFieldType(fieldTag)
-                    : TypeTagUtil.getBuiltinTypeByTag(fieldTag);
-        }
-        return fieldTypes[fieldIdx];
-    }
-}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AbstractAGenericBinaryComparator.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AbstractAGenericBinaryComparator.java
index 098c81f..6d7ad76 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AbstractAGenericBinaryComparator.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AbstractAGenericBinaryComparator.java
@@ -18,23 +18,20 @@
  */
 package org.apache.asterix.dataflow.data.nontagged.comparators;
 
-import static org.apache.asterix.om.types.ATypeTag.VALUE_TYPE_MAPPING;
+import static org.apache.asterix.om.util.container.ObjectFactories.RECORD_FACTORY;
+import static org.apache.asterix.om.util.container.ObjectFactories.STORAGE_FACTORY;
+import static org.apache.asterix.om.util.container.ObjectFactories.VOID_FACTORY;
 
 import java.io.IOException;
-import java.util.Comparator;
-import java.util.List;
-import java.util.PriorityQueue;
 
 import org.apache.asterix.dataflow.data.common.ListAccessorUtil;
-import org.apache.asterix.dataflow.data.nontagged.CompareHashUtil;
 import org.apache.asterix.dataflow.data.nontagged.serde.ADateSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.ADayTimeDurationSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.ATimeSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AYearMonthDurationSerializerDeserializer;
-import org.apache.asterix.om.pointables.ARecordVisitablePointable;
-import org.apache.asterix.om.pointables.PointableAllocator;
-import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.pointables.nonvisitor.RecordField;
+import org.apache.asterix.om.pointables.nonvisitor.SortedRecord;
 import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
@@ -43,9 +40,7 @@
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
 import org.apache.asterix.om.types.hierachy.ATypeHierarchy.Domain;
-import org.apache.asterix.om.util.container.IObjectPool;
 import org.apache.asterix.om.util.container.ListObjectPool;
-import org.apache.asterix.om.util.container.ObjectFactories;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
@@ -61,64 +56,43 @@
  */
 abstract class AbstractAGenericBinaryComparator implements IBinaryComparator {
 
-    // BOOLEAN
     private final IBinaryComparator ascBoolComp = BooleanBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-    // STRING
     private final IBinaryComparator ascStrComp =
             new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY).createBinaryComparator();
-    // BINARY
     private final IBinaryComparator ascByteArrayComp =
             new PointableBinaryComparatorFactory(ByteArrayPointable.FACTORY).createBinaryComparator();
-    // RECTANGLE
     private final IBinaryComparator ascRectangleComp =
             ARectanglePartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-    // CIRCLE
     private final IBinaryComparator ascCircleComp =
             ACirclePartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-    // DURATION
     private final IBinaryComparator ascDurationComp =
             ADurationPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-    // INTERVAL
     private final IBinaryComparator ascIntervalComp =
             AIntervalAscPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-    // LINE
     private final IBinaryComparator ascLineComp = ALinePartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-    // POINT
     private final IBinaryComparator ascPointComp =
             APointPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-    // POINT3D
     private final IBinaryComparator ascPoint3DComp =
             APoint3DPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-    // POLYGON
     private final IBinaryComparator ascPolygonComp =
             APolygonPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-    // UUID
     private final IBinaryComparator ascUUIDComp = AUUIDPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-    // RAW
     private final IBinaryComparator rawComp = RawBinaryComparatorFactory.INSTANCE.createBinaryComparator();
 
-    // these fields can be null
+    // the type fields can be null
     protected final IAType leftType;
     protected final IAType rightType;
-    private final IObjectPool<IMutableValueStorage, Void> storageAllocator;
-    private final IObjectPool<IPointable, Void> voidPointableAllocator;
-    // used for record comparison, sorting field names
-    private final PointableAllocator recordAllocator;
-    private final IObjectPool<PriorityQueue<IVisitablePointable>, Void> heapAllocator;
-    private final Comparator<IVisitablePointable> fieldNamesComparator;
+    private final ListObjectPool<IMutableValueStorage, Void> storageAllocator = new ListObjectPool<>(STORAGE_FACTORY);
+    private final ListObjectPool<IPointable, Void> voidPointableAllocator = new ListObjectPool<>(VOID_FACTORY);
+    private final ListObjectPool<SortedRecord, ARecordType> recordPool = new ListObjectPool<>(RECORD_FACTORY);
 
     AbstractAGenericBinaryComparator(IAType leftType, IAType rightType) {
         // factory should have already made sure to get the actual type
         this.leftType = leftType;
         this.rightType = rightType;
-        this.storageAllocator = new ListObjectPool<>(ObjectFactories.STORAGE_FACTORY);
-        this.voidPointableAllocator = new ListObjectPool<>(ObjectFactories.VOID_FACTORY);
-        this.recordAllocator = new PointableAllocator();
-        this.fieldNamesComparator = CompareHashUtil.createFieldNamesComp(ascStrComp);
-        this.heapAllocator = new ListObjectPool<>((type) -> new PriorityQueue<>(fieldNamesComparator));
     }
 
-    protected int compare(IAType leftType, byte[] b1, int s1, int l1, IAType rightType, byte[] b2, int s2, int l2)
+    protected final int compare(IAType leftType, byte[] b1, int s1, int l1, IAType rightType, byte[] b2, int s2, int l2)
             throws HyracksDataException {
         if (b1[s1] == ATypeTag.SERIALIZED_MISSING_TYPE_TAG) {
             return b2[s2] == ATypeTag.SERIALIZED_MISSING_TYPE_TAG ? 0 : -1;
@@ -132,16 +106,14 @@
         }
         ATypeTag tag1 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b1[s1]);
         ATypeTag tag2 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b2[s2]);
-        // if one of tag is null, that means we are dealing with an empty byte array in one side.
-        // and, we don't need to continue. We just compare raw byte by byte.
+        // tag being null could mean several things among of which is that the passed args are not tagged
         if (tag1 == null || tag2 == null) {
             return rawComp.compare(b1, s1, l1, b2, s2, l2);
         }
         if (ATypeHierarchy.isCompatible(tag1, tag2) && ATypeHierarchy.getTypeDomain(tag1) == Domain.NUMERIC) {
             return ComparatorUtil.compareNumbers(tag1, b1, s1 + 1, tag2, b2, s2 + 1);
         }
-        // currently only numbers are compatible. if two tags are not compatible, we compare the tags.
-        // this is especially useful when we need to generate some order between any two types.
+        // currently only numbers are compatible. if two tags are not compatible, we compare the tags to generate order
         if (tag1 != tag2) {
             return Byte.compare(b1[s1], b2[s2]);
         }
@@ -191,7 +163,6 @@
             case OBJECT:
                 return compareRecords(leftType, b1, s1, l1, rightType, b2, s2, l2);
             default:
-                // we include typeTag in comparison to compare between two type to enforce some ordering
                 return rawComp.compare(b1, s1, l1, b2, s2, l2);
         }
     }
@@ -248,53 +219,34 @@
         }
         ARecordType leftRecordType = (ARecordType) TypeComputeUtils.getActualTypeOrOpen(leftType, ATypeTag.OBJECT);
         ARecordType rightRecordType = (ARecordType) TypeComputeUtils.getActualTypeOrOpen(rightType, ATypeTag.OBJECT);
-        ARecordVisitablePointable leftRecord = recordAllocator.allocateRecordValue(leftRecordType);
-        ARecordVisitablePointable rightRecord = recordAllocator.allocateRecordValue(rightRecordType);
-        PriorityQueue<IVisitablePointable> leftNamesHeap = null, rightNamesHeap = null;
+        SortedRecord leftRecord = recordPool.allocate(leftRecordType);
+        SortedRecord rightRecord = recordPool.allocate(rightRecordType);
+        IPointable leftFieldValue = voidPointableAllocator.allocate(null);
+        IPointable rightFieldValue = voidPointableAllocator.allocate(null);
+        // TODO(ali): this is not ideal. should be removed when tagged pointables are introduced
+        ArrayBackedValueStorage leftStorage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
+        ArrayBackedValueStorage rightStorage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
         try {
-            leftRecord.set(b1, s1, l1);
-            rightRecord.set(b2, s2, l2);
-            List<IVisitablePointable> leftFieldsNames = leftRecord.getFieldNames();
-            List<IVisitablePointable> rightFieldsNames = rightRecord.getFieldNames();
-            List<IVisitablePointable> leftFieldsValues = leftRecord.getFieldValues();
-            List<IVisitablePointable> rightFieldsValues = rightRecord.getFieldValues();
-            leftNamesHeap = heapAllocator.allocate(null);
-            rightNamesHeap = heapAllocator.allocate(null);
-            leftNamesHeap.clear();
-            rightNamesHeap.clear();
-            int numLeftValuedFields = CompareHashUtil.addToHeap(leftFieldsNames, leftFieldsValues, leftNamesHeap);
-            int numRightValuedFields = CompareHashUtil.addToHeap(rightFieldsNames, rightFieldsValues, rightNamesHeap);
-            if (numLeftValuedFields == 0 && numRightValuedFields == 0) {
-                return 0;
-            } else if (numLeftValuedFields == 0) {
-                return -1;
-            } else if (numRightValuedFields == 0) {
-                return 1;
-            }
-            int result;
-            int leftFieldIdx, rightFieldIdx;
+            leftRecord.reset(b1, s1);
+            rightRecord.reset(b2, s2);
             IAType leftFieldType, rightFieldType;
-            IVisitablePointable leftFieldName, leftFieldValue, rightFieldName, rightFieldValue;
-            ATypeTag fieldTag;
-            while (!leftNamesHeap.isEmpty() && !rightNamesHeap.isEmpty()) {
-                leftFieldName = leftNamesHeap.poll();
-                rightFieldName = rightNamesHeap.poll();
+            RecordField leftField, rightField;
+            int result;
+            while (!leftRecord.isEmpty() && !rightRecord.isEmpty()) {
+                leftField = leftRecord.poll();
+                rightField = rightRecord.poll();
                 // compare the names first
-                result = ascStrComp.compare(leftFieldName.getByteArray(), leftFieldName.getStartOffset() + 1,
-                        leftFieldName.getLength() - 1, rightFieldName.getByteArray(),
-                        rightFieldName.getStartOffset() + 1, rightFieldName.getLength() - 1);
+                result = RecordField.FIELD_NAME_COMP.compare(leftField, rightField);
                 if (result != 0) {
                     return result;
                 }
                 // then compare the values if the names are equal
-                leftFieldIdx = CompareHashUtil.getIndex(leftFieldsNames, leftFieldName);
-                rightFieldIdx = CompareHashUtil.getIndex(rightFieldsNames, rightFieldName);
-                leftFieldValue = leftFieldsValues.get(leftFieldIdx);
-                rightFieldValue = rightFieldsValues.get(rightFieldIdx);
-                fieldTag = VALUE_TYPE_MAPPING[leftFieldValue.getByteArray()[leftFieldValue.getStartOffset()]];
-                leftFieldType = CompareHashUtil.getType(leftRecordType, leftFieldIdx, fieldTag);
-                fieldTag = VALUE_TYPE_MAPPING[rightFieldValue.getByteArray()[rightFieldValue.getStartOffset()]];
-                rightFieldType = CompareHashUtil.getType(rightRecordType, rightFieldIdx, fieldTag);
+                leftStorage.reset();
+                rightStorage.reset();
+                leftRecord.getFieldValue(leftField, leftFieldValue, leftStorage);
+                rightRecord.getFieldValue(rightField, rightFieldValue, rightStorage);
+                leftFieldType = leftRecord.getFieldType(leftField);
+                rightFieldType = rightRecord.getFieldType(rightField);
                 result = compare(leftFieldType, leftFieldValue.getByteArray(), leftFieldValue.getStartOffset(),
                         leftFieldValue.getLength(), rightFieldType, rightFieldValue.getByteArray(),
                         rightFieldValue.getStartOffset(), rightFieldValue.getLength());
@@ -302,17 +254,16 @@
                     return result;
                 }
             }
-
-            return Integer.compare(numLeftValuedFields, numRightValuedFields);
+            return Integer.compare(leftRecord.size(), rightRecord.size());
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
         } finally {
-            recordAllocator.freeRecord(rightRecord);
-            recordAllocator.freeRecord(leftRecord);
-            if (rightNamesHeap != null) {
-                heapAllocator.free(rightNamesHeap);
-            }
-            if (leftNamesHeap != null) {
-                heapAllocator.free(leftNamesHeap);
-            }
+            recordPool.free(rightRecord);
+            recordPool.free(leftRecord);
+            voidPointableAllocator.free(rightFieldValue);
+            voidPointableAllocator.free(leftFieldValue);
+            storageAllocator.free(rightStorage);
+            storageAllocator.free(leftStorage);
         }
     }
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/LogicalComplexBinaryComparator.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/LogicalComplexBinaryComparator.java
index b83e248..aa902bb 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/LogicalComplexBinaryComparator.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/LogicalComplexBinaryComparator.java
@@ -30,7 +30,6 @@
 
 import org.apache.asterix.dataflow.data.common.ILogicalBinaryComparator;
 import org.apache.asterix.dataflow.data.common.ListAccessorUtil;
-import org.apache.asterix.dataflow.data.nontagged.CompareHashUtil;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.om.base.IAObject;
 import org.apache.asterix.om.pointables.ARecordVisitablePointable;
@@ -44,6 +43,7 @@
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.util.container.IObjectPool;
 import org.apache.asterix.om.util.container.ListObjectPool;
+import org.apache.asterix.om.utils.RecordUtil;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IMutableValueStorage;
@@ -275,8 +275,8 @@
                                 if (leftFTag == ATypeTag.NULL || rightFTag == ATypeTag.NULL) {
                                     tempCompResult = Result.NULL;
                                 } else if (leftFTag.isDerivedType() && rightFTag.isDerivedType()) {
-                                    leftFieldType = CompareHashUtil.getType(leftRecordType, i, leftFTag);
-                                    rightFieldType = CompareHashUtil.getType(rightRecordType, k, rightFTag);
+                                    leftFieldType = RecordUtil.getType(leftRecordType, i, leftFTag);
+                                    rightFieldType = RecordUtil.getType(rightRecordType, k, rightFTag);
                                     tempCompResult = compareComplex(leftFieldType, leftFTag, leftFieldValue,
                                             rightFieldType, rightFTag, rightFieldValue);
                                 } else {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/hash/AMurmurHash3BinaryHashFunctionFamily.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/hash/AMurmurHash3BinaryHashFunctionFamily.java
index 2ef6e80..020fbd1 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/hash/AMurmurHash3BinaryHashFunctionFamily.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/hash/AMurmurHash3BinaryHashFunctionFamily.java
@@ -18,19 +18,16 @@
  */
 package org.apache.asterix.dataflow.data.nontagged.hash;
 
-import static org.apache.asterix.om.types.ATypeTag.VALUE_TYPE_MAPPING;
+import static org.apache.asterix.om.util.container.ObjectFactories.RECORD_FACTORY;
+import static org.apache.asterix.om.util.container.ObjectFactories.STORAGE_FACTORY;
+import static org.apache.asterix.om.util.container.ObjectFactories.VOID_FACTORY;
 
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Comparator;
-import java.util.List;
-import java.util.PriorityQueue;
 
 import org.apache.asterix.dataflow.data.common.ListAccessorUtil;
-import org.apache.asterix.dataflow.data.nontagged.CompareHashUtil;
-import org.apache.asterix.om.pointables.ARecordVisitablePointable;
-import org.apache.asterix.om.pointables.PointableAllocator;
-import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.pointables.nonvisitor.RecordField;
+import org.apache.asterix.om.pointables.nonvisitor.SortedRecord;
 import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
@@ -41,16 +38,13 @@
 import org.apache.asterix.om.types.hierachy.IntegerToDoubleTypeConvertComputer;
 import org.apache.asterix.om.util.container.IObjectPool;
 import org.apache.asterix.om.util.container.ListObjectPool;
-import org.apache.asterix.om.util.container.ObjectFactories;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.accessors.MurmurHash3BinaryHash;
-import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import org.apache.hyracks.data.std.api.IMutableValueStorage;
 import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
 public class AMurmurHash3BinaryHashFunctionFamily implements IBinaryHashFunctionFamily {
@@ -83,24 +77,15 @@
 
         private final ArrayBackedValueStorage valueBuffer = new ArrayBackedValueStorage();
         private final DataOutput valueOut = valueBuffer.getDataOutput();
-        private final IObjectPool<IPointable, Void> voidPointableAllocator;
-        private final IObjectPool<IMutableValueStorage, Void> storageAllocator;
+        private final IObjectPool<IPointable, Void> voidPointableAllocator = new ListObjectPool<>(VOID_FACTORY);
+        private final IObjectPool<IMutableValueStorage, Void> storageAllocator = new ListObjectPool<>(STORAGE_FACTORY);
+        private final IObjectPool<SortedRecord, ARecordType> recordPool = new ListObjectPool<>(RECORD_FACTORY);
         private final IAType type;
         private final int seed;
-        // used for record hashing, sorting field names first
-        private final PointableAllocator recordAllocator;
-        private final IObjectPool<PriorityQueue<IVisitablePointable>, Void> heapAllocator;
-        private final Comparator<IVisitablePointable> fieldNamesComparator;
 
         private GenericHashFunction(IAType type, int seed) {
             this.type = type;
             this.seed = seed;
-            this.voidPointableAllocator = new ListObjectPool<>(ObjectFactories.VOID_FACTORY);
-            this.storageAllocator = new ListObjectPool<>(ObjectFactories.STORAGE_FACTORY);
-            this.recordAllocator = new PointableAllocator();
-            this.fieldNamesComparator = CompareHashUtil.createFieldNamesComp(
-                    new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY).createBinaryComparator());
-            this.heapAllocator = new ListObjectPool<>((arg) -> new PriorityQueue<>(fieldNamesComparator));
         }
 
         @Override
@@ -178,35 +163,28 @@
                 return MurmurHash3BinaryHash.hash(bytes, offset, length, seed);
             }
             ARecordType recordType = (ARecordType) TypeComputeUtils.getActualTypeOrOpen(type, ATypeTag.OBJECT);
-            ARecordVisitablePointable record = recordAllocator.allocateRecordValue(recordType);
-            PriorityQueue<IVisitablePointable> namesHeap = heapAllocator.allocate(null);
+            SortedRecord record = recordPool.allocate(recordType);
+            IPointable fieldValue = voidPointableAllocator.allocate(null);
+            // TODO(ali): this is not ideal. should be removed when tagged pointables are introduced
+            ArrayBackedValueStorage storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
             try {
-                record.set(bytes, offset, length);
-                namesHeap.clear();
-                List<IVisitablePointable> fieldsNames = record.getFieldNames();
-                List<IVisitablePointable> fieldsValues = record.getFieldValues();
-                CompareHashUtil.addToHeap(fieldsNames, fieldsValues, namesHeap);
-                IVisitablePointable fieldName, fieldValue;
-                IAType fieldType;
-                ATypeTag fieldTag;
+                record.reset(bytes, offset);
                 int hash = seed;
-                int fieldIdx;
-                while (!namesHeap.isEmpty()) {
-                    fieldName = namesHeap.poll();
-                    // TODO(ali): currently doing another lookup to find the target field index and get its value & type
-                    fieldIdx = CompareHashUtil.getIndex(fieldsNames, fieldName);
-                    fieldValue = fieldsValues.get(fieldIdx);
-                    fieldTag = VALUE_TYPE_MAPPING[fieldValue.getByteArray()[fieldValue.getStartOffset()]];
-                    fieldType = CompareHashUtil.getType(recordType, fieldIdx, fieldTag);
-                    hash ^= MurmurHash3BinaryHash.hash(fieldName.getByteArray(), fieldName.getStartOffset(),
-                            fieldName.getLength(), seed)
-                            ^ hash(fieldType, fieldValue.getByteArray(), fieldValue.getStartOffset(),
-                                    fieldValue.getLength());
+                while (!record.isEmpty()) {
+                    RecordField field = record.poll();
+                    storage.reset();
+                    record.getFieldValue(field, fieldValue, storage);
+                    IAType fieldType = record.getFieldType(field);
+                    hash ^= field.getName().hash() ^ hash(fieldType, fieldValue.getByteArray(),
+                            fieldValue.getStartOffset(), fieldValue.getLength());
                 }
                 return hash;
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
             } finally {
-                recordAllocator.freeRecord(record);
-                heapAllocator.free(namesHeap);
+                recordPool.free(record);
+                voidPointableAllocator.free(fieldValue);
+                storageAllocator.free(storage);
             }
         }
     }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
index 7089fd6..193ff7d 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
@@ -34,6 +34,7 @@
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.util.container.IObjectFactory;
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.om.utils.RecordUtil;
 import org.apache.asterix.om.utils.ResettableByteArrayOutputStream;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.util.string.UTF8StringWriter;
@@ -197,14 +198,12 @@
                 for (int fieldNumber = 0; fieldNumber < numberOfSchemaFields; fieldNumber++) {
                     if (hasOptionalFields) {
                         byte b1 = b[nullBitMapOffset + fieldNumber / 4];
-                        int p = 1 << (7 - 2 * (fieldNumber % 4));
-                        if ((b1 & p) == 0) {
+                        if (RecordUtil.isNull(b1, fieldNumber)) {
                             // set null value (including type tag inside)
                             fieldValues.add(nullReference);
                             continue;
                         }
-                        p = 1 << (7 - 2 * (fieldNumber % 4) - 1);
-                        if ((b1 & p) == 0) {
+                        if (RecordUtil.isMissing(b1, fieldNumber)) {
                             // set missing value (including type tag inside)
                             fieldValues.add(missingReference);
                             continue;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
index 4ffbf47..8820ce2 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
@@ -46,16 +46,16 @@
 
 import com.fasterxml.jackson.databind.JsonNode;
 
-/*
+/**
  * This class interprets the binary data representation of a record.
  *
  * Record {
  *   byte tag;
  *   int length;
- *   byte isExpanded;
+ *   byte isExpanded?;
  *   int openOffset?;
  *   int numberOfClosedFields;
- *   byte[ceil (numberOfFields / 4)] nullBitMap; // 1 bit per field, "1" means field is Null for this record
+ *   byte[ceil (numberOfFields / 4)] nullBitMap; // 2 bits per field, "1" means field is null, "2" field is missing
  *   int[numberOfClosedFields] closedFieldOffset;
  *   IPointable[numberOfClosedFields] fieldValue;
  *   int numberOfOpenFields?;
@@ -107,20 +107,14 @@
 
     }
 
-    public static final IObjectFactory<IPointable, ATypeTag> ALLOCATOR = new IObjectFactory<IPointable, ATypeTag>() {
-        @Override
-        public IPointable create(ATypeTag type) {
-            return new ARecordPointable();
-        }
-    };
-
-    private static final int TAG_SIZE = 1;
-    private static final int RECORD_LENGTH_SIZE = 4;
-    private static final int EXPANDED_SIZE = 1;
-    private static final int OPEN_OFFSET_SIZE = 4;
-    private static final int CLOSED_COUNT_SIZE = 4;
-    private static final int FIELD_OFFSET_SIZE = 4;
-    private static final int OPEN_COUNT_SIZE = 4;
+    public static final IObjectFactory<IPointable, ATypeTag> ALLOCATOR = type -> new ARecordPointable();
+    static final int TAG_SIZE = 1;
+    static final int RECORD_LENGTH_SIZE = 4;
+    static final int EXPANDED_SIZE = 1;
+    static final int OPEN_OFFSET_SIZE = 4;
+    static final int CLOSED_COUNT_SIZE = 4;
+    static final int FIELD_OFFSET_SIZE = 4;
+    static final int OPEN_COUNT_SIZE = 4;
     private static final int OPEN_FIELD_HASH_SIZE = 4;
     private static final int OPEN_FIELD_OFFSET_SIZE = 4;
     private static final int OPEN_FIELD_HEADER = OPEN_FIELD_HASH_SIZE + OPEN_FIELD_OFFSET_SIZE;
@@ -129,97 +123,46 @@
         return recordType == null || recordType.isOpen();
     }
 
-    public int getSchemeFieldCount(ARecordType recordType) {
+    public final int getSchemeFieldCount(ARecordType recordType) {
         return recordType.getFieldNames().length;
     }
 
-    public byte getTag() {
-        return BytePointable.getByte(bytes, getTagOffset());
-    }
-
-    public int getTagOffset() {
-        return start;
-    }
-
-    public int getTagSize() {
-        return TAG_SIZE;
-    }
-
     @Override
     public int getLength() {
-        return IntegerPointable.getInteger(bytes, getLengthOffset());
+        return IntegerPointable.getInteger(bytes, start + TAG_SIZE);
     }
 
-    public int getLengthOffset() {
-        return getTagOffset() + getTagSize();
+    private boolean isExpanded(ARecordType recordType) {
+        return isOpen(recordType) && BooleanPointable.getBoolean(bytes, start + TAG_SIZE + RECORD_LENGTH_SIZE);
     }
 
-    public int getLengthSize() {
-        return RECORD_LENGTH_SIZE;
+    private int getOpenPartOffset(ARecordType recordType) {
+        return start + TAG_SIZE + RECORD_LENGTH_SIZE + (isOpen(recordType) ? EXPANDED_SIZE : 0);
     }
 
-    public boolean isExpanded(ARecordType recordType) {
-        if (isOpen(recordType)) {
-            return BooleanPointable.getBoolean(bytes, getExpandedOffset(recordType));
-        }
-        return false;
+    private int getNullBitmapOffset(ARecordType recordType) {
+        return getOpenPartOffset(recordType) + (isExpanded(recordType) ? OPEN_OFFSET_SIZE : 0) + CLOSED_COUNT_SIZE;
     }
 
-    public int getExpandedOffset(ARecordType recordType) {
-        return getLengthOffset() + getLengthSize();
-    }
-
-    public int getExpandedSize(ARecordType recordType) {
-        return isOpen(recordType) ? EXPANDED_SIZE : 0;
-    }
-
-    public int getOpenPartOffset(ARecordType recordType) {
-        return getExpandedOffset(recordType) + getExpandedSize(recordType);
-    }
-
-    public int getOpenPartSize(ARecordType recordType) {
-        return isExpanded(recordType) ? OPEN_OFFSET_SIZE : 0;
-    }
-
-    public int getClosedFieldCount(ARecordType recordType) {
-        return IntegerPointable.getInteger(bytes, getClosedFieldCountOffset(recordType));
-    }
-
-    public int getClosedFieldCountOffset(ARecordType recordType) {
-        return getOpenPartOffset(recordType) + getOpenPartSize(recordType);
-    }
-
-    public int getClosedFieldCountSize(ARecordType recordType) {
-        return CLOSED_COUNT_SIZE;
-    }
-
-    public int getNullBitmapOffset(ARecordType recordType) {
-        return getClosedFieldCountOffset(recordType) + getClosedFieldCountSize(recordType);
-    }
-
-    public int getNullBitmapSize(ARecordType recordType) {
+    private int getNullBitmapSize(ARecordType recordType) {
         return RecordUtil.computeNullBitmapSize(recordType);
     }
 
     public boolean isClosedFieldNull(ARecordType recordType, int fieldId) {
-        if (getNullBitmapSize(recordType) > 0) {
-            return (bytes[getNullBitmapOffset(recordType) + fieldId / 4] & (1 << (7 - 2 * (fieldId % 4)))) == 0;
-        }
-        return false;
+        return getNullBitmapSize(recordType) > 0
+                && RecordUtil.isNull(bytes[getNullBitmapOffset(recordType) + fieldId / 4], fieldId);
     }
 
-    public boolean isClosedFieldMissing(ARecordType recordType, int fieldId) {
-        if (getNullBitmapSize(recordType) > 0) {
-            return (bytes[getNullBitmapOffset(recordType) + fieldId / 4] & (1 << (7 - 2 * (fieldId % 4) - 1))) == 0;
-        }
-        return false;
+    private boolean isClosedFieldMissing(ARecordType recordType, int fieldId) {
+        return getNullBitmapSize(recordType) > 0
+                && RecordUtil.isMissing(bytes[getNullBitmapOffset(recordType) + fieldId / 4], fieldId);
     }
 
     // -----------------------
     // Closed field accessors.
     // -----------------------
 
-    public void getClosedFieldValue(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
+    public final void getClosedFieldValue(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
         if (isClosedFieldNull(recordType, fieldId)) {
             dOut.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
         } else if (isClosedFieldMissing(recordType, fieldId)) {
@@ -238,27 +181,28 @@
      * @param pointable
      * @throws IOException
      */
-    public void getClosedFieldValue(ARecordType recordType, int fieldId, IPointable pointable) throws IOException {
+    public final void getClosedFieldValue(ARecordType recordType, int fieldId, IPointable pointable)
+            throws IOException {
         if (isClosedFieldNull(recordType, fieldId) || isClosedFieldMissing(recordType, fieldId)) {
             throw new IllegalStateException("Can't read a null or missing field");
         }
         pointable.set(bytes, getClosedFieldOffset(recordType, fieldId), getClosedFieldSize(recordType, fieldId));
     }
 
-    public String getClosedFieldName(ARecordType recordType, int fieldId) {
+    private String getClosedFieldName(ARecordType recordType, int fieldId) {
         return recordType.getFieldNames()[fieldId];
     }
 
-    public void getClosedFieldName(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
+    public final void getClosedFieldName(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
         dOut.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
         utf8Writer.writeUTF8(getClosedFieldName(recordType, fieldId), dOut);
     }
 
-    public byte getClosedFieldTag(ARecordType recordType, int fieldId) {
+    public final byte getClosedFieldTag(ARecordType recordType, int fieldId) {
         return getClosedFieldType(recordType, fieldId).getTypeTag().serialize();
     }
 
-    public IAType getClosedFieldType(ARecordType recordType, int fieldId) {
+    public final IAType getClosedFieldType(ARecordType recordType, int fieldId) {
         IAType aType = recordType.getFieldTypes()[fieldId];
         if (NonTaggedFormatUtil.isOptional(aType)) {
             // optional field: add the embedded non-null type tag
@@ -267,7 +211,7 @@
         return aType;
     }
 
-    public int getClosedFieldSize(ARecordType recordType, int fieldId) throws HyracksDataException {
+    public final int getClosedFieldSize(ARecordType recordType, int fieldId) throws HyracksDataException {
         if (isClosedFieldNull(recordType, fieldId)) {
             return 0;
         }
@@ -275,7 +219,7 @@
                 getClosedFieldType(recordType, fieldId).getTypeTag(), false);
     }
 
-    public int getClosedFieldOffset(ARecordType recordType, int fieldId) {
+    public final int getClosedFieldOffset(ARecordType recordType, int fieldId) {
         int offset = getNullBitmapOffset(recordType) + getNullBitmapSize(recordType) + fieldId * FIELD_OFFSET_SIZE;
         return start + IntegerPointable.getInteger(bytes, offset);
     }
@@ -284,15 +228,15 @@
     // Open field count.
     // -----------------------
 
-    public int getOpenFieldCount(ARecordType recordType) {
+    public final int getOpenFieldCount(ARecordType recordType) {
         return isExpanded(recordType) ? IntegerPointable.getInteger(bytes, getOpenFieldCountOffset(recordType)) : 0;
     }
 
-    public int getOpenFieldCountSize(ARecordType recordType) {
+    private int getOpenFieldCountSize(ARecordType recordType) {
         return isExpanded(recordType) ? OPEN_COUNT_SIZE : 0;
     }
 
-    public int getOpenFieldCountOffset(ARecordType recordType) {
+    private int getOpenFieldCountOffset(ARecordType recordType) {
         return start + IntegerPointable.getInteger(bytes, getOpenPartOffset(recordType));
     }
 
@@ -300,67 +244,53 @@
     // Open field accessors.
     // -----------------------
 
-    public void getOpenFieldValue(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
+    public final void getOpenFieldValue(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
         dOut.write(bytes, getOpenFieldValueOffset(recordType, fieldId), getOpenFieldValueSize(recordType, fieldId));
     }
 
-    public int getOpenFieldValueOffset(ARecordType recordType, int fieldId) {
+    public final int getOpenFieldValueOffset(ARecordType recordType, int fieldId) {
         return getOpenFieldNameOffset(recordType, fieldId) + getOpenFieldNameSize(recordType, fieldId);
     }
 
-    public int getOpenFieldValueSize(ARecordType recordType, int fieldId) throws HyracksDataException {
+    public final int getOpenFieldValueSize(ARecordType recordType, int fieldId) throws HyracksDataException {
         int offset = getOpenFieldValueOffset(recordType, fieldId);
         ATypeTag tag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(getOpenFieldTag(recordType, fieldId));
         return NonTaggedFormatUtil.getFieldValueLength(bytes, offset, tag, true);
     }
 
-    public void getOpenFieldName(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
+    public final void getOpenFieldName(ARecordType recordType, int fieldId, DataOutput dOut) throws IOException {
         dOut.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
         dOut.write(bytes, getOpenFieldNameOffset(recordType, fieldId), getOpenFieldNameSize(recordType, fieldId));
     }
 
-    public String getOpenFieldName(ARecordType recordType, int fieldId) throws IOException {
+    public final String getOpenFieldName(ARecordType recordType, int fieldId) {
         StringBuilder str = new StringBuilder();
         int offset = getOpenFieldNameOffset(recordType, fieldId);
-        UTF8StringUtil.toString(str, bytes, offset);
-        String fieldName = str.toString();
-        return fieldName;
+        return UTF8StringUtil.toString(str, bytes, offset).toString();
     }
 
-    public int getOpenFieldNameSize(ARecordType recordType, int fieldId) {
+    private int getOpenFieldNameSize(ARecordType recordType, int fieldId) {
         int utfleng = UTF8StringUtil.getUTFLength(bytes, getOpenFieldNameOffset(recordType, fieldId));
         return utfleng + UTF8StringUtil.getNumBytesToStoreLength(utfleng);
     }
 
-    public int getOpenFieldNameOffset(ARecordType recordType, int fieldId) {
+    private int getOpenFieldNameOffset(ARecordType recordType, int fieldId) {
         return getOpenFieldOffset(recordType, fieldId);
     }
 
-    public byte getOpenFieldTag(ARecordType recordType, int fieldId) {
+    public final byte getOpenFieldTag(ARecordType recordType, int fieldId) {
         return bytes[getOpenFieldValueOffset(recordType, fieldId)];
     }
 
-    public int getOpenFieldHash(ARecordType recordType, int fieldId) {
-        return IntegerPointable.getInteger(bytes, getOpenFieldHashOffset(recordType, fieldId));
-    }
-
-    public int getOpenFieldHashOffset(ARecordType recordType, int fieldId) {
+    private int getOpenFieldHashOffset(ARecordType recordType, int fieldId) {
         return getOpenFieldCountOffset(recordType) + getOpenFieldCountSize(recordType) + fieldId * OPEN_FIELD_HEADER;
     }
 
-    public int getOpenFieldHashSize(ARecordType recordType, int fieldId) {
-        return OPEN_FIELD_HASH_SIZE;
-    }
-
-    public int getOpenFieldOffset(ARecordType recordType, int fieldId) {
+    private int getOpenFieldOffset(ARecordType recordType, int fieldId) {
         return start + IntegerPointable.getInteger(bytes, getOpenFieldOffsetOffset(recordType, fieldId));
     }
 
-    public int getOpenFieldOffsetOffset(ARecordType recordType, int fieldId) {
-        return getOpenFieldHashOffset(recordType, fieldId) + getOpenFieldHashSize(recordType, fieldId);
-    }
-
-    public int getOpenFieldOffsetSize(ARecordType recordType, int fieldId) {
-        return OPEN_FIELD_HASH_SIZE;
+    private int getOpenFieldOffsetOffset(ARecordType recordType, int fieldId) {
+        return getOpenFieldHashOffset(recordType, fieldId) + OPEN_FIELD_HASH_SIZE;
     }
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/RecordField.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/RecordField.java
new file mode 100644
index 0000000..0cd16dc
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/RecordField.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.om.pointables.nonvisitor;
+
+import java.util.Comparator;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+
+/**
+ * This class can be used to hold information about a field in a record. The {@link #index} variable is the position of
+ * the field in the record as laid out in the bytes of the record. The value pointed to by the {@link #valueOffset}
+ * could be tagged or non-tagged.
+ */
+public final class RecordField {
+
+    public static final Comparator<RecordField> FIELD_NAME_COMP =
+            (field1, field2) -> UTF8StringPointable.compare(field1.namePointable, field2.namePointable);
+    private UTF8StringPointable namePointable;
+    private ATypeTag valueTag;
+    private int index;
+    private int valueOffset;
+    private int valueLength;
+
+    RecordField() {
+    }
+
+    // for open field
+    final void set(UTF8StringPointable namePointable, int index, int valueOffset, int valueLength, ATypeTag valueTag) {
+        this.namePointable = namePointable;
+        set(index, valueOffset, valueLength, valueTag);
+    }
+
+    // for closed field where the name is already set
+    final void set(int index, int valueOffset, int valueLength, ATypeTag valueTag) {
+        this.index = index;
+        this.valueOffset = valueOffset;
+        this.valueTag = valueTag;
+        this.valueLength = valueLength;
+    }
+
+    public final UTF8StringPointable getName() {
+        return namePointable;
+    }
+
+    final int getIndex() {
+        return index;
+    }
+
+    final int getValueOffset() {
+        return valueOffset;
+    }
+
+    final int getValueLength() {
+        return valueLength;
+    }
+
+    final ATypeTag getValueTag() {
+        return valueTag;
+    }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/SortedRecord.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/SortedRecord.java
new file mode 100644
index 0000000..1d9ac6c
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/SortedRecord.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.om.pointables.nonvisitor;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.container.IObjectFactory;
+import org.apache.asterix.om.util.container.IObjectPool;
+import org.apache.asterix.om.util.container.ListObjectPool;
+import org.apache.asterix.om.util.container.ObjectFactories;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+/**
+ * This class is used to access the fields of a record in lexicographical order of the field names. The fields can be
+ * retrieved by calling {@link #poll()} which will poll the field with the minimum field name. The field value can be
+ * retrieved lazily by using the information stored in the {@link RecordField} returned from polling. Before
+ * accessing the record information, {@link #reset(byte[], int)} has to be called first to point to the bytes of the
+ * record in question.
+ */
+public final class SortedRecord {
+
+    private static final IObjectFactory<RecordField, Void> FIELD_FACTORY = type -> new RecordField();
+    // TODO(ali): copied from PointableHelper for now.
+    private static final byte[] NULL_BYTES = new byte[] { ATypeTag.SERIALIZED_NULL_TYPE_TAG };
+    private static final byte[] MISSING_BYTES = new byte[] { ATypeTag.SERIALIZED_MISSING_TYPE_TAG };
+    private final IObjectPool<UTF8StringPointable, Void> utf8Pool = new ListObjectPool<>(ObjectFactories.UTF8_FACTORY);
+    private final IObjectPool<RecordField, Void> recordFieldPool = new ListObjectPool<>(FIELD_FACTORY);
+    private final PriorityQueue<RecordField> sortedFields = new PriorityQueue<>(RecordField.FIELD_NAME_COMP);
+    private final ARecordType recordType;
+    private final IAType[] fieldTypes;
+    private final RecordField[] closedFields;
+    private final int numSchemaFields;
+    private final boolean hasOptionalFields;
+    private final int nullBitMapSize;
+    private byte[] bytes;
+
+    public SortedRecord(ARecordType recordType) {
+        String[] fieldNames = recordType.getFieldNames();
+        this.numSchemaFields = fieldNames.length;
+        this.recordType = recordType;
+        this.fieldTypes = recordType.getFieldTypes();
+        this.hasOptionalFields = NonTaggedFormatUtil.hasOptionalField(recordType);
+        this.nullBitMapSize = RecordUtil.computeNullBitmapSize(hasOptionalFields, recordType);
+        this.closedFields = new RecordField[numSchemaFields];
+        UTF8StringWriter utf8Writer = new UTF8StringWriter();
+        ByteArrayAccessibleOutputStream byteArrayStream = new ByteArrayAccessibleOutputStream();
+        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayStream);
+        try {
+            // write each closed field into a pointable, create a new field and add to the list of closedFields
+            for (int i = 0; i < numSchemaFields; i++) {
+                int nameStart = dataOutputStream.size();
+                utf8Writer.writeUTF8(fieldNames[i], dataOutputStream);
+                int nameEnd = dataOutputStream.size();
+                UTF8StringPointable utf8Pointable = UTF8StringPointable.FACTORY.createPointable();
+                utf8Pointable.set(byteArrayStream.getByteArray(), nameStart, nameEnd - nameStart);
+                RecordField field = new RecordField();
+                field.set(utf8Pointable, -1, -1, -1, null);
+                closedFields[i] = field;
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * This method should be called before using the record. It recalculates logical indices and offsets of fields,
+     * closed and open. It populates the utf8 filed names for open.
+     */
+    public final void reset(byte[] data, int start) throws HyracksDataException {
+        bytes = data;
+        reset();
+        boolean isExpanded = false;
+        // advance to expanded byte if present
+        int cursor = start + ARecordPointable.TAG_SIZE + ARecordPointable.RECORD_LENGTH_SIZE;
+        if (recordType.isOpen()) {
+            isExpanded = bytes[cursor] == 1;
+            // advance to either open part offset or number of closed fields
+            cursor += ARecordPointable.EXPANDED_SIZE;
+        }
+        int openPartOffset = 0;
+        if (isExpanded) {
+            openPartOffset = start + AInt32SerializerDeserializer.getInt(bytes, cursor);
+            // advance to number of closed fields
+            cursor += ARecordPointable.OPEN_OFFSET_SIZE;
+        }
+        int fieldOffset;
+        int length;
+        int fieldIndex = 0;
+        ATypeTag tag;
+        // advance to where fields offsets are (or to null bit map if the schema has optional fields)
+        cursor += ARecordPointable.CLOSED_COUNT_SIZE;
+        int nullBitMapOffset = cursor;
+        int fieldsOffsets = cursor + nullBitMapSize;
+        // compute the offsets of each closed field value and whether it's missing or null
+        for (int i = 0; i < numSchemaFields; i++, fieldIndex++) {
+            fieldOffset = AInt32SerializerDeserializer.getInt(bytes, fieldsOffsets) + start;
+            tag = TypeComputeUtils.getActualType(fieldTypes[i]).getTypeTag();
+            if (hasOptionalFields) {
+                byte nullBits = bytes[nullBitMapOffset + i / 4];
+                if (RecordUtil.isNull(nullBits, i)) {
+                    tag = ATypeTag.NULL;
+                } else if (RecordUtil.isMissing(nullBits, i)) {
+                    tag = ATypeTag.MISSING;
+                }
+            }
+            length = NonTaggedFormatUtil.getFieldValueLength(bytes, fieldOffset, tag, false);
+            closedFields[i].set(fieldIndex, fieldOffset, length, tag);
+            if (tag != ATypeTag.MISSING) {
+                sortedFields.add(closedFields[i]);
+            }
+            fieldsOffsets += ARecordPointable.FIELD_OFFSET_SIZE;
+        }
+        // then populate open fields info second, an open field has name + value (tagged)
+        if (isExpanded) {
+            int numberOpenFields = AInt32SerializerDeserializer.getInt(bytes, openPartOffset);
+            fieldOffset = openPartOffset + ARecordPointable.OPEN_COUNT_SIZE + (8 * numberOpenFields);
+            for (int i = 0; i < numberOpenFields; i++, fieldIndex++) {
+                // get a pointable to the field name
+                length = NonTaggedFormatUtil.getFieldValueLength(bytes, fieldOffset, ATypeTag.STRING, false);
+                UTF8StringPointable openFieldName = utf8Pool.allocate(null);
+                openFieldName.set(bytes, fieldOffset, length);
+                // move to the value
+                fieldOffset += length;
+                tag = ATypeTag.VALUE_TYPE_MAPPING[bytes[fieldOffset]];
+                // +1 to account for the tag included since the field is open
+                length = NonTaggedFormatUtil.getFieldValueLength(bytes, fieldOffset, tag, true) + 1;
+                RecordField openField = recordFieldPool.allocate(null);
+                openField.set(openFieldName, fieldIndex, fieldOffset, length, tag);
+                sortedFields.add(openField);
+                // then skip the value to the next field name
+                fieldOffset += length;
+            }
+        }
+    }
+
+    private void reset() {
+        sortedFields.clear();
+        utf8Pool.reset();
+        recordFieldPool.reset();
+    }
+
+    public final boolean isEmpty() {
+        return sortedFields.isEmpty();
+    }
+
+    public final RecordField poll() {
+        return sortedFields.poll();
+    }
+
+    public final int size() {
+        return sortedFields.size();
+    }
+
+    public final IAType getFieldType(RecordField field) throws HyracksDataException {
+        return RecordUtil.getType(recordType, field.getIndex(), field.getValueTag());
+    }
+
+    public final void getFieldValue(RecordField field, IPointable pointable, ArrayBackedValueStorage storage)
+            throws IOException {
+        int fieldIdx = field.getIndex();
+        if (fieldIdx >= numSchemaFields) {
+            // open field
+            pointable.set(bytes, field.getValueOffset(), field.getValueLength());
+        } else {
+            // closed field
+            if (field.getValueTag() == ATypeTag.MISSING) {
+                pointable.set(MISSING_BYTES, 0, MISSING_BYTES.length);
+            } else if (field.getValueTag() == ATypeTag.NULL) {
+                pointable.set(NULL_BYTES, 0, NULL_BYTES.length);
+            } else {
+                // TODO(ali): this is not ideal. should not need to copy the tag when tagged pointables are introduced
+                int start = storage.getLength();
+                storage.getDataOutput().writeByte(field.getValueTag().serialize());
+                storage.getDataOutput().write(bytes, field.getValueOffset(), field.getValueLength());
+                int end = storage.getLength();
+                pointable.set(storage.getByteArray(), start, end - start);
+            }
+        }
+    }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java
index 941f59f..53ae805 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java
@@ -122,7 +122,7 @@
         VALUE_TYPE_MAPPING = typeList.toArray(new ATypeTag[typeList.size()]);
     }
 
-    ATypeTag(int value) {
+    private ATypeTag(int value) {
         this.value = (byte) value;
     }
 
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java
index e4167ec..5d97125 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java
@@ -85,7 +85,6 @@
             case OBJECT:
                 return RecordUtil.FULLY_OPEN_RECORD_TYPE;
             case MULTISET:
-                // TODO: how come the item type in this instance is "null"
                 return AUnorderedListType.FULLY_OPEN_UNORDEREDLIST_TYPE;
             case ARRAY:
                 return AOrderedListType.FULL_OPEN_ORDEREDLIST_TYPE;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ListObjectPool.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ListObjectPool.java
index bdc1943..78c411a 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ListObjectPool.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ListObjectPool.java
@@ -24,10 +24,8 @@
 import java.util.List;
 
 /**
- * Object pool backed by a list.
- * The argument for creating E instances could be different. This class also
- * considers arguments in object reusing, e.g., it reuses an E instances ONLY
- * when the construction argument is "equal".
+ * Object pool backed by a list. The argument for creating E instances could be different. This class also considers
+ * arguments in object reusing, e.g., it reuses an E instances ONLY when the construction argument is "equal".
  */
 public class ListObjectPool<E, T> implements IObjectPool<E, T> {
 
@@ -36,12 +34,12 @@
     /**
      * cache of objects
      */
-    private List<E> pool = new ArrayList<E>();
+    private List<E> pool = new ArrayList<>();
 
     /**
      * args that are used to create each element in the pool
      */
-    private List<T> args = new ArrayList<T>();
+    private List<T> args = new ArrayList<>();
 
     /**
      * bits indicating which element is in use
@@ -85,7 +83,7 @@
 
     @Override
     public boolean free(E element) {
-        for (int i = pool.size() - 1; i >= 0; i--) {
+        for (int i = usedBits.length(); (i = usedBits.previousSetBit(i - 1)) >= 0;) {
             if (element == pool.get(i)) {
                 usedBits.clear(i);
                 return true;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ObjectFactories.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ObjectFactories.java
index 763e1d4..2c6e408 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ObjectFactories.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ObjectFactories.java
@@ -20,8 +20,11 @@
 
 import java.util.BitSet;
 
+import org.apache.asterix.om.pointables.nonvisitor.SortedRecord;
+import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.data.std.api.IMutableValueStorage;
 import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
@@ -36,8 +39,10 @@
     private ObjectFactories() {
     }
 
-    public static final IObjectFactory<IPointable, Void> VOID_FACTORY = (type) -> new VoidPointable();
+    public static final IObjectFactory<IPointable, Void> VOID_FACTORY = type -> new VoidPointable();
     public static final IObjectFactory<IMutableValueStorage, Void> STORAGE_FACTORY =
-            (type) -> new ArrayBackedValueStorage();
-    public static final IObjectFactory<BitSet, Void> BIT_SET_FACTORY = (type) -> new BitSet();
+            type -> new ArrayBackedValueStorage();
+    public static final IObjectFactory<BitSet, Void> BIT_SET_FACTORY = type -> new BitSet();
+    public static final IObjectFactory<UTF8StringPointable, Void> UTF8_FACTORY = type -> new UTF8StringPointable();
+    public static final IObjectFactory<SortedRecord, ARecordType> RECORD_FACTORY = SortedRecord::new;
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java
index ec1c7cb..cdbe8d3 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java
@@ -20,9 +20,13 @@
 
 import java.util.List;
 
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class RecordUtil {
     /**
@@ -35,44 +39,52 @@
     }
 
     /**
-     * Create a fully open record type with the passed name
-     *
-     * @param name
-     * @return
-     */
-    public static ARecordType createOpenRecordType(String name) {
-        return new ARecordType(name, new String[0], new IAType[0], true);
-    }
-
-    /**
-     * A util method that takes a field name and return a String representation for error messages
-     *
-     * @param field
-     * @return
+     * @param field a field name represented by a list
+     * @return string version of the field
      */
     public static String toFullyQualifiedName(List<String> field) {
         return StringUtils.join(field, ".");
     }
 
     /**
-     * A util method that takes String array and return a String representation for error messages
-     *
-     * @param field
-     * @return
+     * @param names a hierarchy of entity names
+     * @return string version of the entity name to be qualified
      */
     public static String toFullyQualifiedName(String... names) {
         return StringUtils.join(names, ".");
     }
 
     /**
-     * compute the null Bitmap size for the open fields
+     * Computes the null Bitmap size when the schema has optional fields (nullable/missable)
      *
-     * @param recordType
-     *            the record type
-     * @return the size of the bitmap
+     * @param recordType the record type
+     * @return the size of the bitmap in number of bytes
      */
     public static int computeNullBitmapSize(ARecordType recordType) {
-        return NonTaggedFormatUtil.hasOptionalField(recordType)
-                ? (int) Math.ceil(recordType.getFieldNames().length / 4.0) : 0;
+        return computeNullBitmapSize(NonTaggedFormatUtil.hasOptionalField(recordType), recordType);
+    }
+
+    public static int computeNullBitmapSize(boolean hasOptionalField, ARecordType recordType) {
+        // each field needs 2 bits for MISSING, NULL, and VALUE. for 4 fields, 4*2=8 bits (1 byte), thus divide by 4.
+        return hasOptionalField ? (int) Math.ceil(recordType.getFieldTypes().length / 4.0) : 0;
+    }
+
+    public static boolean isNull(byte nullMissingBits, int fieldIndex) {
+        int position = 1 << (7 - 2 * (fieldIndex % 4));
+        return (nullMissingBits & position) == 0;
+    }
+
+    public static boolean isMissing(byte nullMissingBits, int fieldIndex) {
+        int position = 1 << (7 - 2 * (fieldIndex % 4) - 1);
+        return (nullMissingBits & position) == 0;
+    }
+
+    public static IAType getType(ARecordType recordType, int fieldIdx, ATypeTag fieldTag) throws HyracksDataException {
+        IAType[] fieldTypes = recordType.getFieldTypes();
+        if (fieldIdx >= fieldTypes.length) {
+            return fieldTag.isDerivedType() ? DefaultOpenFieldType.getDefaultOpenFieldType(fieldTag)
+                    : TypeTagUtil.getBuiltinTypeByTag(fieldTag);
+        }
+        return fieldTypes[fieldIdx];
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
index f683615..4137581 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
@@ -154,6 +154,14 @@
         return UTF8StringUtil.compareTo(this.bytes, this.start, bytes, start);
     }
 
+    // TODO(ali): could use the normalized key, too.
+    // takes advantage of cached utf8 length and meta length
+    public static int compare(UTF8StringPointable pointable1, UTF8StringPointable pointable2) {
+        return UTF8StringUtil.compareTo(pointable1.bytes, pointable1.start + pointable1.metaLength,
+                pointable1.utf8Length, pointable2.bytes, pointable2.start + pointable2.metaLength,
+                pointable2.utf8Length);
+    }
+
     @Override
     public int hash() {
         if (hashValue == 0) {
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
index 3b3e7b4..9aa2b5d 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
@@ -241,6 +241,12 @@
         return compareTo(thisBytes, thisStart, thatBytes, thatStart, false, false);
     }
 
+    // the start and length of each are the ones calculated by UTF8StringPointable. caller should provide proper values
+    public static int compareTo(byte[] thisBytes, int thisStart, int thisLength, byte[] thatBytes, int thatStart,
+            int thatLength) {
+        return compareTo(thisBytes, thisStart, thisLength, thatBytes, thatStart, thatLength, false, false);
+    }
+
     /**
      * This function provides the raw bytes-based comparison for UTF8 strings.
      * Note that the comparison may not deliver the correct ordering for certain languages that include 2 or 3 bytes characters.