[ASTERIXDB-3324][STO][RT] Clossed types stabilization in columnar

Details:
This patch includes multiple fixes described in ASTERIXDB-3324

Change-Id: Idf5be82359f6fd2f9d80cf33b07b9248218c70cd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17995
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
index 0b65d93..90444fb 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
@@ -16441,10 +16441,10 @@
     <test-case FilePath="column">
       <compilation-unit name="supported-types">
         <output-dir compare="Text">supported-types</output-dir>
-        <expected-error>ASX0067: Type(s) '[datetime]' are not supported in columnar storage format. Supported types are [bigint, double, string, boolean, uuid]</expected-error>
-        <expected-error>ASX0067: Type(s) '[datetime, date, time, duration]' are not supported in columnar storage format. Supported types are [bigint, double, string, boolean, uuid]</expected-error>
-        <expected-error>ASX0067: Type(s) '[datetime]' are not supported in columnar storage format. Supported types are [bigint, double, string, boolean, uuid]</expected-error>
-        <expected-error>ASX0067: Type(s) '[datetime]' are not supported in columnar storage format. Supported types are [bigint, double, string, boolean, uuid]</expected-error>
+        <expected-error>ASX0067: Type(s) '[datetime]' are not supported in columnar storage format. Supported types are [bigint, float, double, string, boolean, uuid]</expected-error>
+        <expected-error>ASX0067: Type(s) '[datetime, date, time, duration]' are not supported in columnar storage format. Supported types are [bigint, float, double, string, boolean, uuid]</expected-error>
+        <expected-error>ASX0067: Type(s) '[datetime]' are not supported in columnar storage format. Supported types are [bigint, float, double, string, boolean, uuid]</expected-error>
+        <expected-error>ASX0067: Type(s) '[datetime]' are not supported in columnar storage format. Supported types are [bigint, float, double, string, boolean, uuid]</expected-error>
         <source-location>false</source-location>
       </compilation-unit>
     </test-case>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractNestedValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractNestedValueAssembler.java
index 1a4c3ef..13820e0 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractNestedValueAssembler.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractNestedValueAssembler.java
@@ -22,7 +22,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
-abstract class AbstractNestedValueAssembler extends AbstractValueAssembler {
+public abstract class AbstractNestedValueAssembler extends AbstractValueAssembler {
     protected final ArrayBackedValueStorage storage;
 
     AbstractNestedValueAssembler(int level, AssemblerInfo info) {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerBuilderVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerBuilderVisitor.java
index 15e2bb9..a3101e8 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerBuilderVisitor.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerBuilderVisitor.java
@@ -27,6 +27,7 @@
 
 import org.apache.asterix.column.assembler.value.IValueGetter;
 import org.apache.asterix.column.assembler.value.IValueGetterFactory;
+import org.apache.asterix.column.metadata.FieldNamesDictionary;
 import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
 import org.apache.asterix.column.metadata.schema.ISchemaNodeVisitor;
 import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
@@ -95,16 +96,18 @@
 
         BitSet declaredFields = handleDeclaredFields(objectNode, info, objectAssembler);
         IntList childrenFieldNameIndexes = objectNode.getChildrenFieldNameIndexes();
-        if (declaredFields.length() < childrenFieldNameIndexes.size()) {
-            //Open fields are requested
+        int numberOfAddedChildren = declaredFields.cardinality();
+        if (numberOfAddedChildren < childrenFieldNameIndexes.size()) {
+            // Now handle any open fields
             for (int i = 0; i < childrenFieldNameIndexes.size(); i++) {
-                int fieldNameIndex = childrenFieldNameIndexes.getInt(i);
-                AbstractSchemaNode childNode = objectNode.getChild(fieldNameIndex);
-                if (childNode.getTypeTag() != ATypeTag.MISSING && !declaredFields.get(fieldNameIndex)) {
+                int fieldNameIdx = childrenFieldNameIndexes.getInt(i);
+                AbstractSchemaNode childNode = objectNode.getChild(fieldNameIdx);
+                if (fieldNameIdx == FieldNamesDictionary.DUMMY_FIELD_NAME_INDEX || !declaredFields.get(fieldNameIdx)) {
+                    numberOfAddedChildren++;
                     IAType childType = getChildType(childNode, BuiltinType.ANY);
-                    IValueReference fieldName = columnMetadata.getFieldNamesDictionary().getFieldName(fieldNameIndex);
+                    IValueReference fieldName = columnMetadata.getFieldNamesDictionary().getFieldName(fieldNameIdx);
                     //The last child should be a delegate
-                    boolean delegate = i == childrenFieldNameIndexes.size() - 1;
+                    boolean delegate = numberOfAddedChildren == childrenFieldNameIndexes.size();
                     AssemblerInfo childInfo = new AssemblerInfo(childType, objectAssembler, delegate, fieldName);
                     childNode.accept(this, childInfo);
                 }
@@ -125,17 +128,18 @@
         String[] declaredFieldNames = declaredType.getFieldNames();
         IAType[] declaredFieldTypes = declaredType.getFieldTypes();
 
-        // The last child of a declared field can be a delegate iff all requested fields are declared
-        boolean containsDelegate = objectNode.getChildren().size() == declaredFieldTypes.length;
+        int addedChildren = 0;
+        int requestedChildren = objectNode.getChildren().size();
         for (int i = 0; i < declaredFieldTypes.length; i++) {
             String fieldName = declaredFieldNames[i];
             int fieldNameIndex = columnMetadata.getFieldNamesDictionary().getFieldNameIndex(fieldName);
             //Check if the declared field was requested
             AbstractSchemaNode childNode = objectNode.getChild(fieldNameIndex);
             if (childNode.getTypeTag() != ATypeTag.MISSING) {
+                addedChildren++;
                 IAType childType = getChildType(childNode, declaredFieldTypes[i]);
                 processedFields.set(fieldNameIndex);
-                boolean delegate = containsDelegate && i == declaredFieldTypes.length - 1;
+                boolean delegate = addedChildren == requestedChildren;
                 AssemblerInfo childInfo = new AssemblerInfo(childType, objectAssembler, delegate, i);
                 childNode.accept(this, childInfo);
             }
@@ -204,7 +208,7 @@
     @Override
     public AbstractValueAssembler visit(PrimitiveSchemaNode primitiveNode, AssemblerInfo info) {
         AbstractPrimitiveValueAssembler assembler;
-        IValueGetter valueGetter = valueGetterFactory.createValueGetter(primitiveNode.getTypeTag());
+        IValueGetter valueGetter = valueGetterFactory.createValueGetter(getTypeTag(info, primitiveNode));
         if (!delimiters.isEmpty()) {
             IColumnValuesReader reader = readerFactory.createValueReader(primitiveNode.getTypeTag(),
                     primitiveNode.getColumnIndex(), level, getDelimiters());
@@ -227,6 +231,17 @@
         return assembler;
     }
 
+    private ATypeTag getTypeTag(AssemblerInfo info, PrimitiveSchemaNode primitiveNode) {
+        IAType declaredType = info.getDeclaredType();
+
+        if (declaredType.getTypeTag() == ATypeTag.ANY) {
+            return primitiveNode.getTypeTag();
+        }
+
+        // Declared types are not (and cannot be) normalized
+        return declaredType.getTypeTag();
+    }
+
     private int[] getDelimiters() {
         int numOfDelimiters = delimiters.size();
         int[] reversed = new int[numOfDelimiters];
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerInfo.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerInfo.java
index 712e65c..bb085c3 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerInfo.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerInfo.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.assembler;
 
+import static org.apache.asterix.om.typecomputer.impl.TypeComputeUtils.getActualType;
+
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
@@ -60,7 +62,7 @@
     public AssemblerInfo(IAType declaredType, AbstractNestedValueAssembler parent, boolean delegate,
             IValueReference fieldName, int fieldIndex, boolean fieldNameTagged) {
         this.parent = parent;
-        this.declaredType = declaredType;
+        this.declaredType = getActualType(declaredType);
         this.delegate = delegate;
         this.fieldName = fieldNameTagged ? fieldName : createTaggedFieldName(fieldName);
         this.fieldIndex = fieldIndex;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/PrimitiveValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/PrimitiveValueAssembler.java
index be46333..520f1d5 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/PrimitiveValueAssembler.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/PrimitiveValueAssembler.java
@@ -50,6 +50,8 @@
             throw createException();
         } else if (reader.isNull() && (isDelegate() || reader.getLevel() + 1 == level)) {
             addNullToAncestor(reader.getLevel());
+        } else if (reader.isMissing() && isDelegate() && reader.getLevel() < level) {
+            addMissingToAncestor(reader.getLevel());
         } else if (reader.isValue()) {
             addValueToParent();
         }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/FloatValueGetter.java
similarity index 79%
copy from asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/FloatValueGetter.java
index e76e3c9..c183e68 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/FloatValueGetter.java
@@ -21,16 +21,16 @@
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.primitive.FloatPointable;
 
-class LongValueGetter extends AbstractFixedLengthValueGetter {
-    LongValueGetter() {
-        super(ATypeTag.BIGINT, Long.BYTES);
+class FloatValueGetter extends AbstractFixedLengthValueGetter {
+    FloatValueGetter() {
+        super(ATypeTag.FLOAT, Float.BYTES);
     }
 
     @Override
     public IValueReference getValue(IColumnValuesReader reader) {
-        LongPointable.setLong(value.getByteArray(), value.getStartOffset() + 1, reader.getLong());
+        FloatPointable.setFloat(value.getByteArray(), value.getStartOffset() + 1, reader.getFloat());
         return value;
     }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/Int16ValueGetter.java
similarity index 78%
copy from asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/Int16ValueGetter.java
index e76e3c9..8c66728 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/Int16ValueGetter.java
@@ -21,16 +21,16 @@
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.primitive.ShortPointable;
 
-class LongValueGetter extends AbstractFixedLengthValueGetter {
-    LongValueGetter() {
-        super(ATypeTag.BIGINT, Long.BYTES);
+public class Int16ValueGetter extends AbstractFixedLengthValueGetter {
+    Int16ValueGetter() {
+        super(ATypeTag.SMALLINT, Short.BYTES);
     }
 
     @Override
     public IValueReference getValue(IColumnValuesReader reader) {
-        LongPointable.setLong(value.getByteArray(), value.getStartOffset() + 1, reader.getLong());
+        ShortPointable.setShort(value.getByteArray(), value.getStartOffset() + 1, (short) reader.getLong());
         return value;
     }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/Int32ValueGetter.java
similarity index 78%
copy from asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/Int32ValueGetter.java
index e76e3c9..969f5c2 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/Int32ValueGetter.java
@@ -21,16 +21,16 @@
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
 
-class LongValueGetter extends AbstractFixedLengthValueGetter {
-    LongValueGetter() {
-        super(ATypeTag.BIGINT, Long.BYTES);
+class Int32ValueGetter extends AbstractFixedLengthValueGetter {
+    Int32ValueGetter() {
+        super(ATypeTag.INTEGER, Integer.BYTES);
     }
 
     @Override
     public IValueReference getValue(IColumnValuesReader reader) {
-        LongPointable.setLong(value.getByteArray(), value.getStartOffset() + 1, reader.getLong());
+        IntegerPointable.setInteger(value.getByteArray(), value.getStartOffset() + 1, (int) reader.getLong());
         return value;
     }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/Int64ValueGetter.java
similarity index 93%
rename from asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java
rename to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/Int64ValueGetter.java
index e76e3c9..9152140 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/Int64ValueGetter.java
@@ -23,8 +23,8 @@
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.primitive.LongPointable;
 
-class LongValueGetter extends AbstractFixedLengthValueGetter {
-    LongValueGetter() {
+class Int64ValueGetter extends AbstractFixedLengthValueGetter {
+    Int64ValueGetter() {
         super(ATypeTag.BIGINT, Long.BYTES);
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/Int8ValueGetter.java
similarity index 79%
copy from asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/Int8ValueGetter.java
index e76e3c9..cb052dd 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/Int8ValueGetter.java
@@ -21,16 +21,17 @@
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.primitive.BytePointable;
 
-class LongValueGetter extends AbstractFixedLengthValueGetter {
-    LongValueGetter() {
-        super(ATypeTag.BIGINT, Long.BYTES);
+class Int8ValueGetter extends AbstractFixedLengthValueGetter {
+
+    Int8ValueGetter() {
+        super(ATypeTag.TINYINT, Byte.BYTES);
     }
 
     @Override
     public IValueReference getValue(IColumnValuesReader reader) {
-        LongPointable.setLong(value.getByteArray(), value.getStartOffset() + 1, reader.getLong());
+        BytePointable.setByte(value.getByteArray(), value.getStartOffset() + 1, (byte) reader.getLong());
         return value;
     }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/ValueGetterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/ValueGetterFactory.java
index 5f7fd7e..bb3839f 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/ValueGetterFactory.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/ValueGetterFactory.java
@@ -35,8 +35,16 @@
                 return MissingValueGetter.INSTANCE;
             case BOOLEAN:
                 return new BooleanValueGetter();
+            case TINYINT:
+                return new Int8ValueGetter();
+            case SMALLINT:
+                return new Int16ValueGetter();
+            case INTEGER:
+                return new Int32ValueGetter();
             case BIGINT:
-                return new LongValueGetter();
+                return new Int64ValueGetter();
+            case FLOAT:
+                return new FloatValueGetter();
             case DOUBLE:
                 return new DoubleValueGetter();
             case STRING:
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/AbstractParquetValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/AbstractParquetValuesReader.java
index 5f5b88c..230da0a 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/AbstractParquetValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/AbstractParquetValuesReader.java
@@ -40,6 +40,10 @@
         throw new UnsupportedOperationException(getClass().getName());
     }
 
+    public float readFloat() {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
     public double readDouble() {
         throw new UnsupportedOperationException(getClass().getName());
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetPlainFixedLengthValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetPlainFixedLengthValuesReader.java
index 07713e1..417043e 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetPlainFixedLengthValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetPlainFixedLengthValuesReader.java
@@ -68,6 +68,15 @@
     }
 
     @Override
+    public float readFloat() {
+        try {
+            return in.readFloat();
+        } catch (IOException e) {
+            throw new ParquetDecodingException("could not read double", e);
+        }
+    }
+
+    @Override
     public double readDouble() {
         try {
             return in.readDouble();
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetValuesWriter.java
index 97e5746..e54a19a 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetValuesWriter.java
@@ -85,6 +85,13 @@
     /**
      * @param v the value to encode
      */
+    public void writeFloat(float v) {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
+    /**
+     * @param v the value to encode
+     */
     public void writeLong(long v) {
         throw new UnsupportedOperationException(getClass().getName());
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java
index 2aba7d2..1dbaa03 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java
@@ -62,6 +62,15 @@
     }
 
     @Override
+    public void writeFloat(float v) {
+        try {
+            out.writeFloat(v);
+        } catch (IOException e) {
+            throw new ParquetEncodingException("could not write int", e);
+        }
+    }
+
+    @Override
     public final void writeDouble(double v) {
         try {
             out.writeDouble(v);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ValueInputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ValueInputStream.java
index ee975f1..1b6fba9 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ValueInputStream.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ValueInputStream.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.primitive.DoublePointable;
+import org.apache.hyracks.data.std.primitive.FloatPointable;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.parquet.bytes.LittleEndianDataInputStream;
@@ -59,6 +60,11 @@
         return LongPointable.getLong(readBuffer, 0);
     }
 
+    public float readFloat() throws IOException {
+        readFully(readBuffer, Float.BYTES);
+        return FloatPointable.getFloat(readBuffer, 0);
+    }
+
     public double readDouble() throws IOException {
         readFully(readBuffer, Double.BYTES);
         return DoublePointable.getDouble(readBuffer, 0);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ValueOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ValueOutputStream.java
index a106a00..d478ee5 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ValueOutputStream.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ValueOutputStream.java
@@ -22,6 +22,7 @@
 import java.io.OutputStream;
 
 import org.apache.hyracks.data.std.primitive.DoublePointable;
+import org.apache.hyracks.data.std.primitive.FloatPointable;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.LongPointable;
 
@@ -49,9 +50,13 @@
         out.write(writeBuffer, 0, Long.BYTES);
     }
 
+    public void writeFloat(float value) throws IOException {
+        FloatPointable.setFloat(writeBuffer, 0, value);
+        out.write(writeBuffer, 0, Float.BYTES);
+    }
+
     public void writeDouble(double value) throws IOException {
         DoublePointable.setDouble(writeBuffer, 0, value);
         out.write(writeBuffer, 0, Double.BYTES);
     }
-
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesDictionary.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesDictionary.java
index aa2e194..9ac226b 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesDictionary.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/FieldNamesDictionary.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.util.string.UTF8StringReader;
 import org.apache.hyracks.util.string.UTF8StringWriter;
@@ -42,6 +43,11 @@
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 
 public class FieldNamesDictionary {
+    /**
+     * Dummy field name used to add a column when encountering empty object
+     */
+    public static final IValueReference DUMMY_FIELD_NAME;
+    public static final int DUMMY_FIELD_NAME_INDEX = -1;
     //For both declared and inferred fields
     private final List<IValueReference> fieldNames;
     private final Object2IntMap<String> declaredFieldNamesToIndexMap;
@@ -55,6 +61,12 @@
     //For lookups
     private final ArrayBackedValueStorage lookupStorage;
 
+    static {
+        VoidPointable dummy = new VoidPointable();
+        dummy.set(new byte[0], 0, 0);
+        DUMMY_FIELD_NAME = dummy;
+    }
+
     public FieldNamesDictionary() {
         this(new ArrayList<>(), new Object2IntOpenHashMap<>(), new Int2IntOpenHashMap());
     }
@@ -78,6 +90,10 @@
 
     //TODO solve collision (they're so rare that I haven't seen any)
     public int getOrCreateFieldNameIndex(IValueReference fieldName) throws HyracksDataException {
+        if (fieldName == DUMMY_FIELD_NAME) {
+            return DUMMY_FIELD_NAME_INDEX;
+        }
+
         int hash = getHash(fieldName);
         if (!hashToFieldNameIndexMap.containsKey(hash)) {
             int index = addFieldName(creatFieldName(fieldName), hash);
@@ -137,6 +153,9 @@
     }
 
     public IValueReference getFieldName(int index) {
+        if (index == DUMMY_FIELD_NAME_INDEX) {
+            return DUMMY_FIELD_NAME;
+        }
         return fieldNames.get(index);
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNode.java
index c9d8635..622705c 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNode.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/AbstractSchemaNode.java
@@ -77,7 +77,11 @@
             case NULL:
             case MISSING:
             case BOOLEAN:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
             case BIGINT:
+            case FLOAT:
             case DOUBLE:
             case STRING:
             case UUID:
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
index a230e86..451ece5 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/ObjectSchemaNode.java
@@ -27,6 +27,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.column.metadata.FieldNamesDictionary;
 import org.apache.asterix.column.metadata.PathInfoSerializer;
 import org.apache.asterix.column.metadata.schema.primitive.MissingFieldSchemaNode;
 import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
@@ -41,14 +42,17 @@
 import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
 import it.unimi.dsi.fastutil.ints.IntImmutableList;
 import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.ints.IntUnaryOperator;
 
 public final class ObjectSchemaNode extends AbstractSchemaNestedNode {
     private final Int2IntMap fieldNameIndexToChildIndexMap;
     private final List<AbstractSchemaNode> children;
+    private IntUnaryOperator nextIndex;
 
     public ObjectSchemaNode() {
         fieldNameIndexToChildIndexMap = new Int2IntOpenHashMap();
         children = new ArrayList<>();
+        nextIndex = this::nextIndex;
     }
 
     ObjectSchemaNode(DataInput input, Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels)
@@ -60,6 +64,11 @@
 
         fieldNameIndexToChildIndexMap = new Int2IntOpenHashMap();
         deserializeFieldNameIndexToChildIndex(input, fieldNameIndexToChildIndexMap, numberOfChildren);
+        if (fieldNameIndexToChildIndexMap.containsKey(FieldNamesDictionary.DUMMY_FIELD_NAME_INDEX)) {
+            nextIndex = this::emptyColumnIndex;
+        } else {
+            nextIndex = this::nextIndex;
+        }
 
         children = new ArrayList<>();
         deserializeChildren(input, children, numberOfChildren, definitionLevels);
@@ -69,7 +78,7 @@
             FlushColumnMetadata columnMetadata) throws HyracksDataException {
         int numberOfChildren = children.size();
         int fieldNameIndex = columnMetadata.getFieldNamesDictionary().getOrCreateFieldNameIndex(fieldName);
-        int childIndex = fieldNameIndexToChildIndexMap.getOrDefault(fieldNameIndex, numberOfChildren);
+        int childIndex = fieldNameIndexToChildIndexMap.getOrDefault(fieldNameIndex, nextIndex.apply(fieldNameIndex));
         AbstractSchemaNode currentChild = childIndex == numberOfChildren ? null : children.get(childIndex);
         AbstractSchemaNode newChild = columnMetadata.getOrCreateChild(currentChild, childTypeTag);
         if (currentChild == null) {
@@ -88,6 +97,15 @@
         children.add(child);
     }
 
+    public void setEmptyObject(FlushColumnMetadata columnMetadata) throws HyracksDataException {
+        if (!children.isEmpty()) {
+            return;
+        }
+        AbstractSchemaNode emptyChild = columnMetadata.getOrCreateChild(null, ATypeTag.MISSING);
+        addChild(FieldNamesDictionary.DUMMY_FIELD_NAME_INDEX, emptyChild);
+        nextIndex = this::emptyColumnIndex;
+    }
+
     public AbstractSchemaNode getChild(int fieldNameIndex) {
         if (fieldNameIndexToChildIndexMap.containsKey(fieldNameIndex)) {
             return children.get(fieldNameIndexToChildIndexMap.get(fieldNameIndex));
@@ -95,11 +113,6 @@
         return MissingFieldSchemaNode.INSTANCE;
     }
 
-    public void removeChild(int fieldNameIndex) {
-        int childIndex = fieldNameIndexToChildIndexMap.remove(fieldNameIndex);
-        children.remove(childIndex);
-    }
-
     public List<AbstractSchemaNode> getChildren() {
         return children;
     }
@@ -112,10 +125,6 @@
                 .sorted(Comparator.comparingInt(Entry::getIntValue)).mapToInt(Entry::getIntKey));
     }
 
-    public boolean containsField(int fieldNameIndex) {
-        return fieldNameIndexToChildIndexMap.containsKey(fieldNameIndex);
-    }
-
     @Override
     public ATypeTag getTypeTag() {
         return ATypeTag.OBJECT;
@@ -141,7 +150,8 @@
         output.write(ATypeTag.OBJECT.serialize());
         output.writeInt(children.size());
         for (Int2IntMap.Entry fieldNameIndexChildIndex : fieldNameIndexToChildIndexMap.int2IntEntrySet()) {
-            output.writeInt(fieldNameIndexChildIndex.getIntKey());
+            int fieldNameIndex = fieldNameIndexChildIndex.getIntKey();
+            output.writeInt(fieldNameIndex);
             output.writeInt(fieldNameIndexChildIndex.getIntValue());
         }
         pathInfoSerializer.enter(this);
@@ -179,4 +189,15 @@
             children.add(AbstractSchemaNode.deserialize(input, definitionLevels));
         }
     }
+
+    private int nextIndex(int fieldNameIndex) {
+        return children.size();
+    }
+
+    private int emptyColumnIndex(int fieldNameIndex) {
+        nextIndex = this::nextIndex;
+        fieldNameIndexToChildIndexMap.remove(FieldNamesDictionary.DUMMY_FIELD_NAME_INDEX);
+        fieldNameIndexToChildIndexMap.put(fieldNameIndex, 0);
+        return 0;
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java
index 3cacb8a..2503143 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/UnionSchemaNode.java
@@ -122,15 +122,18 @@
     }
 
     /**
-     * This would return any numeric node
+     * This would return any numeric node that has a different typeTag than the 'excludeTypeTag'
      *
+     * @param excludeTypeTag exclude child with the provided {@link ATypeTag}
      * @return first numeric node or missing node
      * @see SchemaClipperVisitor
      */
-    public AbstractSchemaNode getNumericChildOrMissing() {
-        for (AbstractSchemaNode node : children.values()) {
-            if (ATypeHierarchy.getTypeDomain(node.getTypeTag()) == ATypeHierarchy.Domain.NUMERIC) {
-                return node;
+    public AbstractSchemaNode getNumericChildOrMissing(ATypeTag excludeTypeTag) {
+        for (AbstractSchemaNode child : children.values()) {
+            ATypeTag childTypeTag = child.getTypeTag();
+            boolean numeric = ATypeHierarchy.getTypeDomain(childTypeTag) == ATypeHierarchy.Domain.NUMERIC;
+            if (numeric && childTypeTag != excludeTypeTag) {
+                return child;
             }
         }
         return MissingFieldSchemaNode.INSTANCE;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/ColumnSupportedTypesValidator.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/ColumnSupportedTypesValidator.java
index 5b27a74..76ea58f 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/ColumnSupportedTypesValidator.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/ColumnSupportedTypesValidator.java
@@ -45,7 +45,7 @@
  */
 public class ColumnSupportedTypesValidator implements IATypeVisitor<Void, Set<ATypeTag>> {
     private static final Set<ATypeTag> SUPPORTED_PRIMITIVE_TYPES =
-            Set.of(ATypeTag.BOOLEAN, ATypeTag.BIGINT, ATypeTag.DOUBLE, ATypeTag.STRING, ATypeTag.UUID);
+            Set.of(ATypeTag.BOOLEAN, ATypeTag.BIGINT, ATypeTag.FLOAT, ATypeTag.DOUBLE, ATypeTag.STRING, ATypeTag.UUID);
     private static final String SUPPORTED_TYPES_STRING =
             SUPPORTED_PRIMITIVE_TYPES.stream().sorted().collect(Collectors.toList()).toString();
     private static final ColumnSupportedTypesValidator VALIDATOR = new ColumnSupportedTypesValidator();
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java
index fb098fa..e6a85b8 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaBuilderFromIATypeVisitor.java
@@ -18,12 +18,15 @@
  */
 package org.apache.asterix.column.metadata.schema.visitor;
 
+import static org.apache.asterix.om.typecomputer.impl.TypeComputeUtils.getActualType;
+
 import java.util.List;
 
 import org.apache.asterix.column.metadata.FieldNamesDictionary;
 import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
 import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
-import org.apache.asterix.column.metadata.schema.collection.ArraySchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.MissingFieldSchemaNode;
 import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.AUnionType;
@@ -51,9 +54,10 @@
         ObjectSchemaNode objectNode = (ObjectSchemaNode) arg;
         columnMetadata.enterLevel(objectNode);
         try {
-            if (processedPrimaryKeys < primaryKeys.size()) {
+            if (isProcessingPrimaryKeys()) {
                 processPrimaryKeys(recordType, objectNode);
             }
+
             for (int i = 0; i < recordType.getFieldTypes().length; i++) {
                 processField(i, recordType, objectNode);
             }
@@ -66,8 +70,8 @@
 
     @Override
     public Void visit(AbstractCollectionType collectionType, AbstractSchemaNode arg) {
-        ArraySchemaNode collectionNode = (ArraySchemaNode) arg;
-        IAType itemType = collectionType.getItemType();
+        AbstractCollectionSchemaNode collectionNode = (AbstractCollectionSchemaNode) arg;
+        IAType itemType = getActualType(collectionType.getItemType());
         columnMetadata.enterLevel(collectionNode);
         try {
             AbstractSchemaNode itemNode = collectionNode.getOrCreateItem(itemType.getTypeTag(), columnMetadata);
@@ -86,7 +90,7 @@
 
     @Override
     public Void visitFlat(IAType flatType, AbstractSchemaNode arg) {
-        if (processedPrimaryKeys < primaryKeys.size()) {
+        if (isProcessingPrimaryKeys()) {
             processedPrimaryKeys++;
         }
         return null;
@@ -97,9 +101,14 @@
      * Handling primary keys and record fields conversion
      * **************************************************************
      */
+
+    private boolean isProcessingPrimaryKeys() {
+        return processedPrimaryKeys < primaryKeys.size();
+    }
+
     private void processPrimaryKeys(ARecordType recordType, ObjectSchemaNode objectNode) throws HyracksDataException {
         if (objectNode == columnMetadata.getRoot() || objectNode == columnMetadata.getMetaRoot()) {
-            while (processedPrimaryKeys < primaryKeys.size()) {
+            while (isProcessingPrimaryKeys()) {
                 currentPrimaryKeyPath = primaryKeys.get(processedPrimaryKeys);
                 currentPathIndex = 0;
                 processPrimaryKeyPath(recordType, objectNode);
@@ -113,6 +122,10 @@
     private void processPrimaryKeyPath(ARecordType recordType, ObjectSchemaNode objectNode)
             throws HyracksDataException {
         int fieldIndex = recordType.getFieldIndex(currentPrimaryKeyPath.get(currentPathIndex));
+        if (fieldIndex < 0) {
+            currentPathIndex--;
+            return;
+        }
         processField(fieldIndex, recordType, objectNode);
     }
 
@@ -122,10 +135,22 @@
         String[] fieldNames = recordType.getFieldNames();
         FieldNamesDictionary dictionary = columnMetadata.getFieldNamesDictionary();
 
+        if (isProcessingPrimaryKeys() && !fieldNames[fieldIndex].equals(currentPrimaryKeyPath.get(currentPathIndex))) {
+            // Still processing PKs, do not add any fields to the children until all PKs are processed
+            return;
+        }
+
         int fieldNameIndex = dictionary.getOrCreateFieldNameIndex(fieldNames[fieldIndex]);
+        AbstractSchemaNode childNode = objectNode.getChild(fieldNameIndex);
+        if (!childNode.isNested() && childNode != MissingFieldSchemaNode.INSTANCE) {
+            // Avoid processing the flat child twice
+            // Can happen if the child is a PK
+            return;
+        }
+
         IValueReference fieldName = dictionary.getFieldName(fieldNameIndex);
 
-        IAType fieldType = fieldTypes[fieldIndex];
+        IAType fieldType = getActualType(fieldTypes[fieldIndex]);
         AbstractSchemaNode child = objectNode.getOrCreateChild(fieldName, fieldType.getTypeTag(), columnMetadata);
 
         fieldType.accept(this, child);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaClipperVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaClipperVisitor.java
index 5db01f2..3d7d954 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaClipperVisitor.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/schema/visitor/SchemaClipperVisitor.java
@@ -87,11 +87,12 @@
 
     @Override
     public AbstractSchemaNode visit(AbstractCollectionType collectionType, AbstractSchemaNode arg) {
-        if (isNotCompatible(collectionType, arg)) {
+        // We check only if arg is a collection to include both array and multiset
+        if (!arg.isCollection() && isNotCompatible(collectionType, arg)) {
             return MissingFieldSchemaNode.INSTANCE;
         }
         AbstractCollectionSchemaNode collectionNode =
-                getActualNode(arg, collectionType.getTypeTag(), AbstractCollectionSchemaNode.class);
+                getActualNode(arg, arg.getTypeTag(), AbstractCollectionSchemaNode.class);
         AbstractSchemaNode newItemNode = collectionType.getItemType().accept(this, collectionNode.getItemNode());
         AbstractCollectionSchemaNode clippedCollectionNode =
                 AbstractCollectionSchemaNode.create(collectionType.getTypeTag());
@@ -115,14 +116,12 @@
     }
 
     private AbstractSchemaNode getNonCompatibleNumericNodeIfAny(IAType flatType, AbstractSchemaNode arg) {
-        ATypeHierarchy.Domain requestedDomain = ATypeHierarchy.getTypeDomain(flatType.getTypeTag());
-        ATypeHierarchy.Domain nodeDomain = ATypeHierarchy.getTypeDomain(arg.getTypeTag());
-        if (nodeDomain == requestedDomain && nodeDomain == ATypeHierarchy.Domain.NUMERIC) {
+        if (isNumeric(flatType.getTypeTag()) && isNumeric(arg.getTypeTag())) {
             // This will be reconciled by the filter accessor
             return arg;
         } else if (arg.getTypeTag() == ATypeTag.UNION) {
             UnionSchemaNode unionNode = (UnionSchemaNode) arg;
-            return unionNode.getNumericChildOrMissing();
+            return unionNode.getNumericChildOrMissing(flatType.getTypeTag());
         }
 
         return MissingFieldSchemaNode.INSTANCE;
@@ -139,14 +138,16 @@
     }
 
     private boolean isNotCompatible(IAType requestedType, AbstractSchemaNode schemaNode) {
-        if (requestedType.getTypeTag() != schemaNode.getTypeTag()) {
+        ATypeTag requestedTypeTag = requestedType.getTypeTag();
+        if (requestedTypeTag != schemaNode.getTypeTag()) {
             if (schemaNode.getTypeTag() != ATypeTag.UNION) {
                 warn(requestedType, schemaNode);
                 return true;
             }
             // Handle union
             UnionSchemaNode unionNode = (UnionSchemaNode) schemaNode;
-            return notInUnion(requestedType, unionNode);
+            return notInUnion(requestedType, unionNode)
+                    || isNumeric(requestedTypeTag) && unionContainsMultipleNumeric(schemaNode);
         }
         return unionContainsMultipleNumeric(schemaNode);
     }
@@ -178,4 +179,8 @@
         }
         return false;
     }
+
+    private static boolean isNumeric(ATypeTag typeTag) {
+        return ATypeHierarchy.getTypeDomain(typeTag) == ATypeHierarchy.Domain.NUMERIC;
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java
index d40e00c..cccac50 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java
@@ -110,6 +110,11 @@
             }
         }
 
+        if (pointable.getNumberOfChildren() == 0) {
+            // Set as empty object
+            objectNode.setEmptyObject(columnMetadata);
+        }
+
         columnMetadata.exitNode(arg);
         currentParent = previousParent;
         return null;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
index e89a120..2cee533 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
@@ -339,9 +339,10 @@
         AbstractSchemaNode currentChild = child;
         ATypeTag normalizedTypeTag = getNormalizedTypeTag(childTypeTag);
         if (currentChild == null || normalizedTypeTag != ATypeTag.MISSING && normalizedTypeTag != ATypeTag.NULL
-                && currentChild.getTypeTag() != ATypeTag.UNION && currentChild.getTypeTag() != normalizedTypeTag) {
+                && currentChild.getTypeTag() != ATypeTag.UNION
+                && getNormalizedTypeTag(currentChild.getTypeTag()) != normalizedTypeTag) {
             //Create a new child or union type if required type is different from the current child type
-            currentChild = createChild(child, normalizedTypeTag);
+            currentChild = createChild(child, childTypeTag);
             //Flag that the schema has changed
             changed = true;
         }
@@ -422,6 +423,23 @@
         }
     }
 
+    public void addNestedNull(AbstractSchemaNestedNode parent, AbstractSchemaNestedNode node)
+            throws HyracksDataException {
+        //Flush all definition levels from parent to the current node
+        flushDefinitionLevels(level, parent, node);
+        //Add null value (+2) to say that both the parent and the child are present
+        definitionLevels.get(node).add(ColumnValuesUtil.getNullMask(level + 2) | level);
+        node.incrementCounter();
+    }
+
+    public void close() {
+        //Dereference multiPageOp
+        multiPageOpRef.setValue(null);
+        for (int i = 0; i < columnWriters.size(); i++) {
+            columnWriters.get(i).close();
+        }
+    }
+
     private void flushDefinitionLevels(int parentMask, int childMask, RunLengthIntArray parentDefLevels,
             AbstractSchemaNode node) throws HyracksDataException {
         int startIndex = node.getCounter();
@@ -480,9 +498,10 @@
         }
     }
 
-    private AbstractSchemaNode createChild(AbstractSchemaNode child, ATypeTag normalizedTypeTag)
+    private AbstractSchemaNode createChild(AbstractSchemaNode child, ATypeTag childTypeTag)
             throws HyracksDataException {
         AbstractSchemaNode createdChild;
+        ATypeTag normalizedTypeTag = getNormalizedTypeTag(childTypeTag);
         if (child != null) {
             if (child.getTypeTag() == ATypeTag.NULL) {
                 //The previous child was a NULL. The new child needs to inherit the NULL definition levels
@@ -498,13 +517,13 @@
                 createdChild = addDefinitionLevelsAndGet(new UnionSchemaNode(child, createChild(normalizedTypeTag)));
             }
         } else {
-            createdChild = createChild(normalizedTypeTag);
+            createdChild = createChild(childTypeTag);
         }
         return createdChild;
     }
 
-    private AbstractSchemaNode createChild(ATypeTag normalizedTypeTag) throws HyracksDataException {
-        switch (normalizedTypeTag) {
+    private AbstractSchemaNode createChild(ATypeTag childTypeTag) throws HyracksDataException {
+        switch (childTypeTag) {
             case OBJECT:
                 return addDefinitionLevelsAndGet(new ObjectSchemaNode());
             case ARRAY:
@@ -514,12 +533,17 @@
             case NULL:
             case MISSING:
             case BOOLEAN:
+            case FLOAT:
             case DOUBLE:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
             case BIGINT:
             case STRING:
             case UUID:
                 int columnIndex = nullWriterIndexes.isEmpty() ? columnWriters.size() : nullWriterIndexes.removeInt(0);
                 boolean primaryKey = columnIndex < getNumberOfPrimaryKeys();
+                ATypeTag normalizedTypeTag = primaryKey ? childTypeTag : getNormalizedTypeTag(childTypeTag);
                 boolean writeAlways = primaryKey || repeated > 0;
                 boolean filtered = !primaryKey;
                 int maxLevel = primaryKey ? 1 : level + 1;
@@ -531,7 +555,7 @@
                 addColumn(columnIndex, writer);
                 return new PrimitiveSchemaNode(columnIndex, normalizedTypeTag, primaryKey);
             default:
-                throw new IllegalStateException("Unsupported type " + normalizedTypeTag);
+                throw new IllegalStateException("Unsupported type " + childTypeTag);
 
         }
     }
@@ -564,21 +588,4 @@
                     metaRecordSchema);
         }
     }
-
-    public void close() {
-        //Dereference multiPageOp
-        multiPageOpRef.setValue(null);
-        for (int i = 0; i < columnWriters.size(); i++) {
-            columnWriters.get(i).close();
-        }
-    }
-
-    public void addNestedNull(AbstractSchemaNestedNode parent, AbstractSchemaNestedNode node)
-            throws HyracksDataException {
-        //Flush all definition levels from parent to the current node
-        flushDefinitionLevels(level, parent, node);
-        //Add null value (+2) to say that both the parent and the child are present
-        definitionLevels.get(node).add(ColumnValuesUtil.getNullMask(level + 2) | level);
-        node.incrementCounter();
-    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/ColumnValuesUtil.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/ColumnValuesUtil.java
index 3094abe..79d5f31 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/ColumnValuesUtil.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/ColumnValuesUtil.java
@@ -54,8 +54,6 @@
             case SMALLINT:
             case INTEGER:
                 return ATypeTag.BIGINT;
-            case FLOAT:
-                return ATypeTag.DOUBLE;
             default:
                 return typeTag;
         }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/SchemaStringBuilderVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/SchemaStringBuilderVisitor.java
index 85aca8f..7134b2f 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/SchemaStringBuilderVisitor.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/SchemaStringBuilderVisitor.java
@@ -82,7 +82,7 @@
 
         for (int i = 0; i < children.size(); i++) {
             int index = fieldNameIndexes.getInt(i);
-            String fieldName = fieldNames.get(index);
+            String fieldName = index < 0 ? "<empty>" : fieldNames.get(index);
             AbstractSchemaNode child = children.get(i);
             append(fieldName, index, child);
             child.accept(this, null);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java
index fcb21c0..51ef2c1 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java
@@ -108,6 +108,8 @@
 
     long getLong();
 
+    float getFloat();
+
     double getDouble();
 
     boolean getBoolean();
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
index 8ed6df6..133c744 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
@@ -147,6 +147,11 @@
     }
 
     @Override
+    public float getFloat() {
+        return valueReader.getFloat();
+    }
+
+    @Override
     public final double getDouble() {
         return valueReader.getDouble();
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java
index bf80580..3411138 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java
@@ -26,11 +26,13 @@
 import org.apache.asterix.column.values.reader.value.AbstractValueReader;
 import org.apache.asterix.column.values.reader.value.BooleanValueReader;
 import org.apache.asterix.column.values.reader.value.DoubleValueReader;
+import org.apache.asterix.column.values.reader.value.FloatValueReader;
 import org.apache.asterix.column.values.reader.value.LongValueReader;
 import org.apache.asterix.column.values.reader.value.NoOpValueReader;
 import org.apache.asterix.column.values.reader.value.StringValueReader;
 import org.apache.asterix.column.values.reader.value.UUIDValueReader;
 import org.apache.asterix.column.values.reader.value.key.DoubleKeyValueReader;
+import org.apache.asterix.column.values.reader.value.key.FloatKeyValueReader;
 import org.apache.asterix.column.values.reader.value.key.LongKeyValueReader;
 import org.apache.asterix.column.values.reader.value.key.StringKeyValueReader;
 import org.apache.asterix.column.values.reader.value.key.UUIDKeyValueReader;
@@ -71,8 +73,13 @@
                 return NoOpValueReader.INSTANCE;
             case BOOLEAN:
                 return new BooleanValueReader();
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
             case BIGINT:
-                return primaryKey ? new LongKeyValueReader() : new LongValueReader();
+                return primaryKey ? new LongKeyValueReader(typeTag) : new LongValueReader();
+            case FLOAT:
+                return primaryKey ? new FloatKeyValueReader() : new FloatValueReader();
             case DOUBLE:
                 return primaryKey ? new DoubleKeyValueReader() : new DoubleValueReader();
             case STRING:
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java
index 4db082f..dbf4830 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java
@@ -41,6 +41,10 @@
         throw new UnsupportedOperationException(getClass().getName());
     }
 
+    public float getFloat() {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
     public double getDouble() {
         throw new UnsupportedOperationException(getClass().getName());
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/FloatValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/FloatValueReader.java
new file mode 100644
index 0000000..90f76e3
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/FloatValueReader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.decoder.ParquetPlainFixedLengthValuesReader;
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.om.types.ATypeTag;
+
+public final class FloatValueReader extends AbstractValueReader {
+    private final ParquetPlainFixedLengthValuesReader floatReader;
+    private float nextValue;
+
+    public FloatValueReader() {
+        floatReader = new ParquetPlainFixedLengthValuesReader(Float.BYTES);
+    }
+
+    @Override
+    public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
+        floatReader.initFromPage(in);
+    }
+
+    @Override
+    public void nextValue() {
+        nextValue = floatReader.readFloat();
+    }
+
+    @Override
+    public float getFloat() {
+        return nextValue;
+    }
+
+    @Override
+    public ATypeTag getTypeTag() {
+        return ATypeTag.FLOAT;
+    }
+
+    @Override
+    public int compareTo(AbstractValueReader o) {
+        return Float.compare(nextValue, o.getFloat());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/FloatKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/FloatKeyValueReader.java
new file mode 100644
index 0000000..be83385
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/FloatKeyValueReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value.key;
+
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.primitive.FloatPointable;
+
+public final class FloatKeyValueReader extends AbstractFixedLengthColumnKeyValueReader {
+    @Override
+    public ATypeTag getTypeTag() {
+        return ATypeTag.FLOAT;
+    }
+
+    @Override
+    protected int getValueLength() {
+        return Float.BYTES;
+    }
+
+    @Override
+    public double getDouble() {
+        return FloatPointable.getFloat(value.getByteArray(), value.getStartOffset());
+    }
+
+    @Override
+    public int compareTo(AbstractValueReader o) {
+        return Float.compare(getFloat(), o.getFloat());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/LongKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/LongKeyValueReader.java
index a981dca..b9456d8 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/LongKeyValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/LongKeyValueReader.java
@@ -23,9 +23,15 @@
 import org.apache.hyracks.data.std.primitive.LongPointable;
 
 public final class LongKeyValueReader extends AbstractFixedLengthColumnKeyValueReader {
+    private final ATypeTag typeTag;
+
+    public LongKeyValueReader(ATypeTag typeTag) {
+        this.typeTag = typeTag;
+    }
+
     @Override
     public ATypeTag getTypeTag() {
-        return ATypeTag.BIGINT;
+        return typeTag;
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnValuesWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnValuesWriterFactory.java
index 6a514ff..3d32a27 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnValuesWriterFactory.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnValuesWriterFactory.java
@@ -40,8 +40,14 @@
                 return new NullMissingColumnValuesWriter(columnIndex, maxLevel, writeAlways, filtered);
             case BOOLEAN:
                 return new BooleanColumnValuesWriter(columnIndex, maxLevel, writeAlways, filtered);
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
             case BIGINT:
-                return new LongColumnValuesWriter(multiPageOpRef, columnIndex, maxLevel, writeAlways, filtered);
+                return new LongColumnValuesWriter(multiPageOpRef, columnIndex, maxLevel, writeAlways, filtered,
+                        typeTag);
+            case FLOAT:
+                return new FloatColumnValuesWriter(multiPageOpRef, columnIndex, maxLevel, writeAlways, filtered);
             case DOUBLE:
                 return new DoubleColumnValuesWriter(multiPageOpRef, columnIndex, maxLevel, writeAlways, filtered);
             case STRING:
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java
new file mode 100644
index 0000000..39abcad
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.writer;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.encoder.ParquetPlainFixedLengthValuesWriter;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
+import org.apache.asterix.column.values.writer.filters.DoubleColumnFilterWriter;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+import org.apache.hyracks.data.std.primitive.FloatPointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.primitive.ShortPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.parquet.bytes.BytesInput;
+
+public final class FloatColumnValuesWriter extends AbstractColumnValuesWriter {
+    private final ParquetPlainFixedLengthValuesWriter floatWriter;
+
+    public FloatColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
+            boolean collection, boolean filtered) {
+        super(columnIndex, level, collection, filtered);
+        floatWriter = new ParquetPlainFixedLengthValuesWriter(multiPageOpRef);
+    }
+
+    @Override
+    protected void addValue(ATypeTag tag, IValueReference value) throws IOException {
+        final float normalizedDouble = getValue(tag, value.getByteArray(), value.getStartOffset());
+        floatWriter.writeFloat(normalizedDouble);
+        filterWriter.addDouble(normalizedDouble);
+    }
+
+    private float getValue(ATypeTag typeTag, byte[] byteArray, int offset) {
+        switch (typeTag) {
+            case TINYINT:
+                return byteArray[offset];
+            case SMALLINT:
+                return ShortPointable.getShort(byteArray, offset);
+            case INTEGER:
+                return IntegerPointable.getInteger(byteArray, offset);
+            case BIGINT:
+                return LongPointable.getLong(byteArray, offset);
+            case FLOAT:
+                return FloatPointable.getFloat(byteArray, offset);
+            case DOUBLE:
+                return (float) DoublePointable.getDouble(byteArray, offset);
+            default:
+                throw new IllegalAccessError(typeTag + "is not of floating type");
+        }
+    }
+
+    @Override
+    protected void resetValues() throws HyracksDataException {
+        floatWriter.reset();
+    }
+
+    @Override
+    protected BytesInput getBytes() throws IOException {
+        return floatWriter.getBytes();
+    }
+
+    @Override
+    protected int getValuesEstimatedSize() {
+        return floatWriter.getEstimatedSize();
+    }
+
+    @Override
+    protected int calculateEstimatedSize(int length) {
+        return floatWriter.calculateEstimatedSize(length);
+    }
+
+    @Override
+    protected int getValuesAllocatedSize() {
+        return floatWriter.getAllocatedSize();
+    }
+
+    @Override
+    protected void addValue(IColumnValuesReader reader) throws IOException {
+        float value = reader.getFloat();
+        floatWriter.writeFloat(value);
+        filterWriter.addDouble(value);
+    }
+
+    @Override
+    protected AbstractColumnFilterWriter createFilter() {
+        return new DoubleColumnFilterWriter();
+    }
+
+    @Override
+    protected void closeValues() {
+        floatWriter.close();
+    }
+
+    @Override
+    protected ATypeTag getTypeTag() {
+        return ATypeTag.FLOAT;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
index 6e41af7..516f56d 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.values.writer;
 
+import static org.apache.asterix.column.util.ColumnValuesUtil.getNormalizedTypeTag;
+
 import java.io.IOException;
 
 import org.apache.asterix.column.bytes.encoder.AbstractParquetValuesWriter;
@@ -38,16 +40,19 @@
 
 final class LongColumnValuesWriter extends AbstractColumnValuesWriter {
     private final AbstractParquetValuesWriter longWriter;
+    private final ATypeTag typeTag;
 
     public LongColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
-            boolean collection, boolean filtered) {
+            boolean collection, boolean filtered, ATypeTag typeTag) {
         super(columnIndex, level, collection, filtered);
-        longWriter = !filtered ? new ParquetPlainFixedLengthValuesWriter(multiPageOpRef)
-                : new ParquetDeltaBinaryPackingValuesWriterForLong(multiPageOpRef);
+        longWriter = filtered ? new ParquetDeltaBinaryPackingValuesWriterForLong(multiPageOpRef)
+                : new ParquetPlainFixedLengthValuesWriter(multiPageOpRef);
+
+        this.typeTag = filtered ? getNormalizedTypeTag(typeTag) : typeTag;
     }
 
     @Override
-    protected void addValue(ATypeTag tag, IValueReference value) throws IOException {
+    protected void addValue(ATypeTag tag, IValueReference value) {
         final long normalizedInt = getValue(tag, value.getByteArray(), value.getStartOffset());
         longWriter.writeLong(normalizedInt);
         filterWriter.addLong(normalizedInt);
@@ -64,7 +69,7 @@
             case BIGINT:
                 return LongPointable.getLong(byteArray, offset);
             default:
-                throw new IllegalAccessError(typeTag + "is not of type integer");
+                throw new IllegalAccessError(typeTag + " is not of type integer");
         }
     }
 
@@ -112,6 +117,6 @@
 
     @Override
     protected ATypeTag getTypeTag() {
-        return ATypeTag.BIGINT;
+        return typeTag;
     }
 }
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/reader/AbstractDummyColumnValuesReader.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/reader/AbstractDummyColumnValuesReader.java
index aceddaf..78ca96b 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/reader/AbstractDummyColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/reader/AbstractDummyColumnValuesReader.java
@@ -116,6 +116,11 @@
     }
 
     @Override
+    public float getFloat() {
+        return -1.0f;
+    }
+
+    @Override
     public final long getLong() {
         return -1;
     }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java
index 45b9f7d..e955756 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java
@@ -42,6 +42,7 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.util.string.UTF8StringUtil;
 
@@ -273,6 +274,13 @@
             pointer += 1 + (isExpanded ? 4 : 0);
         }
 
+        // get number of the actual schema fields
+        int numberOfSchemaFields = IntegerPointable.getInteger(serRecord, pointer);
+        if (numberOfSchemaFields == 0) {
+            // This could happen when columnar datasets assemble empty records (result of filtered mega leaf nodes)
+            return -1;
+        }
+
         //advance to nullBitmap
         pointer += 4;
 
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/AbstractLazyNestedVisitablePointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/AbstractLazyNestedVisitablePointable.java
index e9f8e8a..892a854 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/AbstractLazyNestedVisitablePointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/AbstractLazyNestedVisitablePointable.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.om.lazy;
 
+import static org.apache.asterix.om.typecomputer.impl.TypeComputeUtils.getActualType;
+
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.AbstractCollectionType;
@@ -121,13 +123,14 @@
      * @return a visitable pointable that corresponds to {@code type}
      */
     static AbstractLazyVisitablePointable createVisitable(IAType type) {
-        ATypeTag typeTag = type.getTypeTag();
+        IAType actualType = getActualType(type);
+        ATypeTag typeTag = actualType.getTypeTag();
         switch (typeTag) {
             case OBJECT:
-                return new TypedRecordLazyVisitablePointable(false, (ARecordType) type);
+                return new TypedRecordLazyVisitablePointable(false, (ARecordType) actualType);
             case ARRAY:
             case MULTISET:
-                AbstractCollectionType listType = (AbstractCollectionType) type;
+                AbstractCollectionType listType = (AbstractCollectionType) actualType;
                 return NonTaggedFormatUtil.isFixedSizedCollection(listType.getItemType())
                         ? new FixedListLazyVisitablePointable(false, listType)
                         : new VariableListLazyVisitablePointable(false, listType);
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/FixedListLazyVisitablePointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/FixedListLazyVisitablePointable.java
index f3153b2..711c31e 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/FixedListLazyVisitablePointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/FixedListLazyVisitablePointable.java
@@ -46,7 +46,7 @@
     @Override
     public void nextChild() {
         byte[] data = getByteArray();
-        int itemOffset = getStartOffset() + itemsOffset + currentIndex * itemSize;
+        int itemOffset = itemsOffset + currentIndex * itemSize;
         currentValue.set(data, itemOffset, itemSize);
         currentIndex++;
     }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/MissingLazyVisitablePointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/MissingLazyVisitablePointable.java
new file mode 100644
index 0000000..db8f504
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/MissingLazyVisitablePointable.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.om.lazy;
+
+import org.apache.asterix.om.types.ATypeTag;
+
+class MissingLazyVisitablePointable extends FlatLazyVisitablePointable {
+    public static final FlatLazyVisitablePointable INSTANCE = new MissingLazyVisitablePointable();
+
+    public MissingLazyVisitablePointable() {
+        super(false, ATypeTag.MISSING);
+    }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/NullLazyVisitablePointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/NullLazyVisitablePointable.java
new file mode 100644
index 0000000..95e0f5b
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/NullLazyVisitablePointable.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.om.lazy;
+
+import org.apache.asterix.om.types.ATypeTag;
+
+class NullLazyVisitablePointable extends FlatLazyVisitablePointable {
+    public static final AbstractLazyVisitablePointable INSTANCE = new NullLazyVisitablePointable();
+
+    public NullLazyVisitablePointable() {
+        super(false, ATypeTag.NULL);
+    }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/TypedRecordLazyVisitablePointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/TypedRecordLazyVisitablePointable.java
index 19eb076..61e4eed 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/TypedRecordLazyVisitablePointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/TypedRecordLazyVisitablePointable.java
@@ -102,12 +102,23 @@
         if (isTaggedChild()) {
             visitablePointable = openVisitable;
         } else {
-            visitablePointable = closedVisitables[currentIndex];
+            visitablePointable = getClosedChildVisitable();
         }
         visitablePointable.set(getChildValue());
         return visitablePointable;
     }
 
+    private AbstractLazyVisitablePointable getClosedChildVisitable() {
+        switch (getChildTypeTag()) {
+            case MISSING:
+                return MissingLazyVisitablePointable.INSTANCE;
+            case NULL:
+                return NullLazyVisitablePointable.INSTANCE;
+            default:
+                return closedVisitables[currentIndex];
+        }
+    }
+
     private void setClosedValueInfo() throws HyracksDataException {
         ATypeTag typeTag = closedChildTags[currentIndex];
         if (typeTag == ATypeTag.NULL) {
@@ -148,7 +159,7 @@
         int currentPointer = pointer + 4;
         if (NonTaggedFormatUtil.hasOptionalField(recordType)) {
             initClosedChildrenTags(data, currentPointer);
-            currentPointer =
+            currentPointer +=
                     (numberOfClosedChildren % 4 == 0 ? numberOfClosedChildren / 4 : numberOfClosedChildren / 4 + 1);
         }
         closedValuesOffset = currentPointer;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/VariableListLazyVisitablePointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/VariableListLazyVisitablePointable.java
index 9e4ab9f..4ea649b 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/VariableListLazyVisitablePointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/lazy/VariableListLazyVisitablePointable.java
@@ -38,6 +38,7 @@
         itemTag = listType.getItemType().getTypeTag();
         //-1 if not tagged. The offsets were calculated as if the tag exists.
         actualChildOffset = isTagged() ? 0 : -1;
+        currentChildTypeTag = itemTag.serialize();
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BytePointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BytePointable.java
index 5ac6833..804dda4 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BytePointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BytePointable.java
@@ -66,7 +66,7 @@
         return bytes[start];
     }
 
-    private static void setByte(byte[] bytes, int start, byte value) {
+    public static void setByte(byte[] bytes, int start, byte value) {
         bytes[start] = value;
     }