[MULTIPLE ISSUES] Fixes for handling anti-matters in columnar datasets

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- ASTERIXDB-3214: Avoid skipping non-key columns when
  anti-matter was encountered
- ASTERIXDB-3213: Use single PK reader for both the
  cursor and the assembler
- ASTERIXDB-3212: Account for offset sizes for
  variable-length PKs in columnar datasets

Change-Id: If7dec034a2d75c80688523b19be6fdc38f077a65
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17599
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Al Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.001.ddl.sqlpp
new file mode 100644
index 0000000..b51e2a2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.001.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- Test merging components that only contain antimatter tuples
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+
+CREATE TYPE OpenType AS {
+    myId: int
+};
+
+CREATE DATASET ColumnDataset(OpenType)
+PRIMARY KEY myId WITH {
+    "storage-format": {"format" : "column"}
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.002.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.002.update.sqlpp
new file mode 100644
index 0000000..c7c9145
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.002.update.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+-- Produce relatively large component
+UPSERT INTO ColumnDataset (
+    SELECT VALUE object_concat({"myId": gen_id},
+        {  "coordinates": {"coordinates": [1.1],"type": "string"  },  "created_at": "string",  "entities": {"urls": [{  "display_url": "string",  "expanded_url": "string",  "indices": [1],  "url": "string"}],"user_mentions": [{  "id": 1,  "id_str": "string",  "indices": [1],  "name": "string",  "screen_name": "string"}]  },  "favorite_count": 1,  "favorited": true,  "filter_level": "string",  "geo": {"coordinates": [1.1],"type": "string"  },  "id": "0000000",  "id_str": "string",  "in_reply_to_screen_name": "string",  "in_reply_to_status_id": 1,  "in_reply_to_status_id_str": "string",  "in_reply_to_user_id": 1,  "in_reply_to_user_id_str": "string",  "is_quote_status": true,  "lang": "string",  "place": {"bounding_box": {  "coordinates": [[[1.1]]],  "type": "string"},"country": "string","country_code": "string","full_name": "string","id": "string","name": "string","place_type": "string","url": "string"  },  "possibly_sensitive": true,  "quoted_status": {"created_at": "string","entities": {"user_mentions": [{  "id": 1,  "id_str": "string",  "indices": [1],  "name": "string",  "screen_name": "string"}]},"favorite_count": 1,"favorited": true,"filter_level": "string","id": 1,"id_str": "string","in_reply_to_screen_name": "string","in_reply_to_status_id": 1,"in_reply_to_status_id_str": "string","in_reply_to_user_id": 1,"in_reply_to_user_id_str": "string","is_quote_status": true,"lang": "string","retweet_count": 1,"retweeted": true,"source": "string","text": "string","truncated": true,"user": {  "contributors_enabled": true,  "created_at": "string",  "default_profile": true,  "default_profile_image": true,  "description": "string",  "favourites_count": 1,  "followers_count": 1,  "friends_count": 1,  "geo_enabled": true,  "id": 1,  "id_str": "string",  "is_translator": true,  "lang": "string",  "listed_count": 1,  "name": "string",  "profile_background_color": "string",  "profile_background_image_url": "string",  "profile_background_image_url_https": "string",  "profile_background_tile": true,  "profile_banner_url": "string",  "profile_image_url": "string",  "profile_image_url_https": "string",  "profile_link_color": "string",  "profile_sidebar_border_color": "string",  "profile_sidebar_fill_color": "string",  "profile_text_color": "string",  "profile_use_background_image": true,  "protected": true,  "screen_name": "string",  "statuses_count": 1,  "verified": true}  },  "quoted_status_id": 1,  "quoted_status_id_str": "string",  "retweet_count": 1,  "retweeted": true,  "source": "string",  "text": "string",  "timestamp_ms": "string",  "truncated": true,  "user": {"contributors_enabled": true,"created_at": "string","default_profile": true,"default_profile_image": true,"description": "string","favourites_count": 1,"followers_count": 1,"friends_count": 1,"geo_enabled": true,"id": 1,"id_str": "string","is_translator": true,"lang": "string","listed_count": 1,"location": "string","name": "string","profile_background_color": "string","profile_background_image_url": "string","profile_background_image_url_https": "string","profile_background_tile": true,"profile_banner_url": "string","profile_image_url": "string","profile_image_url_https": "string","profile_link_color": "string","profile_sidebar_border_color": "string","profile_sidebar_fill_color": "string","profile_text_color": "string","profile_use_background_image": true,"protected": true,"screen_name": "string","statuses_count": 1,"time_zone": "string","url": "string","utc_offset": 1,"verified": true  }}
+    )
+    FROM range(0, 29999) gen_id
+);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.004.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.004.update.sqlpp
new file mode 100644
index 0000000..e72f9b2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.004.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+-- Delete 50% of the records and flush
+DELETE FROM ColumnDataset c
+WHERE c.myId % 2 = 0
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.005.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.005.get.http
new file mode 100644
index 0000000..57d830a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.005.get.http
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/connector?dataverseName=test&datasetName=ColumnDataset
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.006.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.006.update.sqlpp
new file mode 100644
index 0000000..3591901
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.006.update.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+-- Delete all records
+DELETE FROM ColumnDataset c;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.007.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.007.query.sqlpp
new file mode 100644
index 0000000..d52cd6c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/delete/005/005.007.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT VALUE COUNT(*)
+FROM ColumnDataset c
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/delete/005/005.005.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/delete/005/005.005.regexadm
new file mode 100644
index 0000000..16c1eaf
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/delete/005/005.005.regexadm
@@ -0,0 +1 @@
+\Q{"keys":"myId","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"OpenType","open":true,"fields":[{"myId":{"type":"AInt64"}}]},"splits":[\E.*\Q]}\E
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/delete/005/005.007.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/delete/005/005.007.adm
new file mode 100644
index 0000000..c227083
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/delete/005/005.007.adm
@@ -0,0 +1 @@
+0
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_single_partition_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_single_partition_sqlpp.xml
index dbc5a1f..3dcfe8d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_single_partition_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_single_partition_sqlpp.xml
@@ -29,5 +29,10 @@
         <output-dir compare="Text">delete/004</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="column">
+      <compilation-unit name="delete/005">
+        <output-dir compare="Text">delete/005</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index cd182ce..a257c2d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -16137,6 +16137,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="column">
+      <compilation-unit name="delete/005">
+        <output-dir compare="Text">delete/005</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="column">
       <compilation-unit name="meta/meta_after_gby">
         <output-dir compare="Text">meta/meta_after_gby</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java
index 0d7c9c88..3c5d726 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java
@@ -39,10 +39,6 @@
         this.reader = reader;
     }
 
-    public final void reset(AbstractBytesInputStream in, int numberOfTuples) throws HyracksDataException {
-        reader.reset(in, numberOfTuples);
-    }
-
     @Override
     public final IValueReference getValue() throws HyracksDataException {
         return primitiveValueGetter.getValue(reader);
@@ -85,9 +81,13 @@
         reader.skip(count);
     }
 
-    public boolean isEndOfGroupAssembler() {
-        return false;
-    }
+    /**
+     * Reset the assembler
+     *
+     * @param in             stream for value reader
+     * @param numberOfTuples in the current mega leaf node
+     */
+    public abstract void reset(AbstractBytesInputStream in, int numberOfTuples) throws HyracksDataException;
 
     /**
      * Move to the next primitive value assembler
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerBuilderVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerBuilderVisitor.java
index 1895404..5acfa08 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerBuilderVisitor.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerBuilderVisitor.java
@@ -21,7 +21,9 @@
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.column.assembler.value.IValueGetter;
 import org.apache.asterix.column.assembler.value.IValueGetterFactory;
@@ -52,6 +54,7 @@
     private final IColumnValuesReaderFactory readerFactory;
     private final List<AbstractPrimitiveValueAssembler> valueAssemblers;
     private final IValueGetterFactory valueGetterFactory;
+    private final Map<Integer, IColumnValuesReader> primaryKeyReaders;
     private AbstractValueAssembler rootAssembler;
 
     //Recursion info
@@ -66,6 +69,10 @@
         this.valueGetterFactory = valueGetterFactory;
         valueAssemblers = new ArrayList<>();
         delimiters = new IntArrayList();
+        primaryKeyReaders = new HashMap<>();
+        for (IColumnValuesReader reader : columnMetadata.getPrimaryKeyReaders()) {
+            primaryKeyReaders.put(reader.getColumnIndex(), reader);
+        }
     }
 
     public AbstractPrimitiveValueAssembler[] createValueAssemblers(AbstractSchemaNode requestedSchema,
@@ -206,9 +213,15 @@
             setDelegate(reader, (RepeatedPrimitiveValueAssembler) assembler);
 
         } else {
-            IColumnValuesReader reader = readerFactory.createValueReader(primitiveNode.getTypeTag(),
-                    primitiveNode.getColumnIndex(), level, primitiveNode.isPrimaryKey());
-            assembler = new PrimitiveValueAssembler(level, info, reader, valueGetter);
+            IColumnValuesReader reader;
+            boolean primaryKey = primitiveNode.isPrimaryKey();
+            if (primaryKey) {
+                reader = primaryKeyReaders.get(primitiveNode.getColumnIndex());
+            } else {
+                reader = readerFactory.createValueReader(primitiveNode.getTypeTag(), primitiveNode.getColumnIndex(),
+                        level, false);
+            }
+            assembler = new PrimitiveValueAssembler(level, info, reader, valueGetter, primaryKey);
         }
         valueAssemblers.add(assembler);
         return assembler;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/EndOfRepeatedGroupAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/EndOfRepeatedGroupAssembler.java
index d4c5cce..805f493 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/EndOfRepeatedGroupAssembler.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/EndOfRepeatedGroupAssembler.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.column.assembler;
 
 import org.apache.asterix.column.assembler.value.MissingValueGetter;
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -36,6 +37,11 @@
     }
 
     @Override
+    public void reset(AbstractBytesInputStream in, int numberOfTuples) throws HyracksDataException {
+        // NoOp
+    }
+
+    @Override
     public int next(AssemblerState state) throws HyracksDataException {
         // Get the current delimiter index from the reader
         int delimiterIndex = reader.getDelimiterIndex();
@@ -73,11 +79,6 @@
     }
 
     @Override
-    public boolean isEndOfGroupAssembler() {
-        return true;
-    }
-
-    @Override
     public void skip(int count) throws HyracksDataException {
         // noOp
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/PrimitiveValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/PrimitiveValueAssembler.java
index 8dcf024..be46333 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/PrimitiveValueAssembler.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/PrimitiveValueAssembler.java
@@ -19,22 +19,34 @@
 package org.apache.asterix.column.assembler;
 
 import org.apache.asterix.column.assembler.value.IValueGetter;
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.column.error.ColumnarValueException;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
-public class PrimitiveValueAssembler extends AbstractPrimitiveValueAssembler {
-    int counter = 0;
+final class PrimitiveValueAssembler extends AbstractPrimitiveValueAssembler {
+    private final boolean primaryKey;
 
-    PrimitiveValueAssembler(int level, AssemblerInfo info, IColumnValuesReader reader, IValueGetter primitiveValue) {
+    PrimitiveValueAssembler(int level, AssemblerInfo info, IColumnValuesReader reader, IValueGetter primitiveValue,
+            boolean primaryKey) {
         super(level, info, reader, primitiveValue);
+        this.primaryKey = primaryKey;
+    }
+
+    @Override
+    public void reset(AbstractBytesInputStream in, int numberOfTuples) throws HyracksDataException {
+        // Do not skip PK readers as they are maintained by the cursor
+        if (!primaryKey) {
+            reader.reset(in, numberOfTuples);
+        }
     }
 
     @Override
     public int next(AssemblerState state) throws HyracksDataException {
-        if (!reader.next()) {
+        // Do not call next on PK readers as they are maintained by the cursor
+        if (!primaryKey && !reader.next()) {
             throw createException();
         } else if (reader.isNull() && (isDelegate() || reader.getLevel() + 1 == level)) {
             addNullToAncestor(reader.getLevel());
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/RepeatedPrimitiveValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/RepeatedPrimitiveValueAssembler.java
index 6588972..502f642 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/RepeatedPrimitiveValueAssembler.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/RepeatedPrimitiveValueAssembler.java
@@ -19,13 +19,14 @@
 package org.apache.asterix.column.assembler;
 
 import org.apache.asterix.column.assembler.value.IValueGetter;
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.column.error.ColumnarValueException;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
-class RepeatedPrimitiveValueAssembler extends AbstractPrimitiveValueAssembler {
+final class RepeatedPrimitiveValueAssembler extends AbstractPrimitiveValueAssembler {
     private boolean arrayDelegate;
 
     RepeatedPrimitiveValueAssembler(int level, AssemblerInfo info, IColumnValuesReader reader,
@@ -35,6 +36,11 @@
     }
 
     @Override
+    public void reset(AbstractBytesInputStream in, int numberOfTuples) throws HyracksDataException {
+        reader.reset(in, numberOfTuples);
+    }
+
+    @Override
     public int next(AssemblerState state) throws HyracksDataException {
         /*
          * Move to the next value if one of the following is true
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetValuesWriter.java
index b53ded2..97e5746 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetValuesWriter.java
@@ -42,9 +42,20 @@
      */
     public abstract void close();
 
+    /**
+     * @return the current (mostly) overestimated size needed to flush this writer
+     */
     public abstract int getEstimatedSize();
 
     /**
+     * @param length the length of value to be return
+     * @return (probably) an overestimated size needed to write a value with the given length
+     */
+    public int calculateEstimatedSize(int length) {
+        return length;
+    }
+
+    /**
      * @return the allocated size of the buffer
      */
     public abstract int getAllocatedSize();
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java
index 63697bc..96c8bdc 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java
@@ -81,6 +81,12 @@
     }
 
     @Override
+    public int calculateEstimatedSize(int length) {
+        // length of the string + 4 bytes for its offset
+        return length + Integer.BYTES;
+    }
+
+    @Override
     public int getAllocatedSize() {
         return offsetStream.capacity() + valueStream.size();
     }
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/DummyBytesInputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/DummyBytesInputStream.java
similarity index 94%
rename from asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/DummyBytesInputStream.java
rename to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/DummyBytesInputStream.java
index 0fc8a26..3a7276d 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/DummyBytesInputStream.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/DummyBytesInputStream.java
@@ -16,13 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.column.common.buffer;
+package org.apache.asterix.column.bytes.stream.in;
 
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
index d0b228f..c5c1753 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -20,6 +20,7 @@
 
 import java.nio.ByteBuffer;
 
+import org.apache.asterix.column.values.IColumnValuesWriter;
 import org.apache.asterix.column.values.writer.ColumnBatchWriter;
 import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
 import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
@@ -38,6 +39,7 @@
     private final ColumnTransformer transformer;
     private final RecordLazyVisitablePointable pointable;
     private final int maxNumberOfTuples;
+    private final IColumnValuesWriter[] primaryKeyWriters;
 
     protected int primaryKeysEstimatedSize;
 
@@ -49,6 +51,12 @@
         writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), pageSize, tolerance);
         this.maxNumberOfTuples = maxNumberOfTuples;
         pointable = new TypedRecordLazyVisitablePointable(columnMetadata.getDatasetType());
+
+        int numberOfPrimaryKeys = columnMetadata.getNumberOfPrimaryKeys();
+        primaryKeyWriters = new IColumnValuesWriter[numberOfPrimaryKeys];
+        for (int i = 0; i < numberOfPrimaryKeys; i++) {
+            primaryKeyWriters[i] = columnMetadata.getWriter(i);
+        }
     }
 
     @Override
@@ -65,7 +73,7 @@
     public final int bytesRequired(ITupleReference tuple) {
         int primaryKeysSize = 0;
         for (int i = 0; i < columnMetadata.getNumberOfPrimaryKeys(); i++) {
-            primaryKeysSize += tuple.getFieldLength(i);
+            primaryKeysSize += primaryKeyWriters[i].getEstimatedSize(tuple.getFieldLength(i));
         }
 
         //Mostly it is an overestimated size
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
index dfd7d6f..0df4aca 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -77,7 +77,7 @@
     public int bytesRequired(ITupleReference tuple) {
         int primaryKeysSize = 0;
         for (int i = 0; i < columnMetadata.getNumberOfPrimaryKeys(); i++) {
-            primaryKeysSize += tuple.getFieldLength(i);
+            primaryKeysSize += primaryKeyWriters[i].getEstimatedSize(tuple.getFieldLength(i));
         }
 
         return primaryKeysSize;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
index 97cc740..b8d9260 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
@@ -83,12 +83,13 @@
             throws HyracksDataException {
         super(datasetType, metaType, primaryKeyReaders.length, serializedMetadata, -1);
         this.fieldNamesDictionary = fieldNamesDictionary;
-        this.assembler = new ColumnAssembler(root, datasetType, this, readerFactory, valueGetterFactory);
         this.primaryKeyReaders = primaryKeyReaders;
         this.normalizedFilterEvaluator = normalizedFilterEvaluator;
         this.filterValueAccessors = filterValueAccessors;
         this.columnFilterEvaluator = columnFilterEvaluator;
         this.filterColumnReaders = filterColumnReaders;
+        // Must be the last value to set as it depends on 'QueryColumnMetadata.this'
+        this.assembler = new ColumnAssembler(root, datasetType, this, readerFactory, valueGetterFactory);
     }
 
     public final ColumnAssembler getAssembler() {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java
index 01e9742..8beae4e 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java
@@ -25,6 +25,7 @@
 import org.apache.asterix.column.assembler.value.ValueGetterFactory;
 import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
 import org.apache.asterix.column.bytes.stream.in.ByteBufferInputStream;
+import org.apache.asterix.column.bytes.stream.in.DummyBytesInputStream;
 import org.apache.asterix.column.bytes.stream.in.MultiByteBufferInputStream;
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
@@ -37,10 +38,14 @@
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.error.ColumnarValueException;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples.AbstractColumnTupleReference;
 import org.apache.hyracks.storage.common.MultiComparator;
 
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public abstract class AbstractAsterixColumnTupleReference extends AbstractColumnTupleReference {
     private final IValueGetter[] primaryKeysValueGetters;
     protected final ByteBufferInputStream[] primaryKeyStreams;
@@ -59,6 +64,7 @@
         primaryKeys = new VoidPointable[numberOfPrimaryKeys];
 
         for (int i = 0; i < numberOfPrimaryKeys; i++) {
+            // Shared with the assembler PK readers (if assembler requires PK readers)
             primaryKeyStreams[i] = new ByteBufferInputStream();
             primaryKeysValueGetters[i] =
                     ValueGetterFactory.INSTANCE.createValueGetter(primaryKeyReaders[i].getTypeTag());
@@ -70,7 +76,8 @@
             if (info.getColumnIndex(i) >= numberOfPrimaryKeys) {
                 columnStreams[i] = new MultiByteBufferInputStream();
             } else {
-                columnStreams[i] = new ByteBufferInputStream();
+                // Assembler's PK readers are shared with the cursor's PK readers
+                columnStreams[i] = DummyBytesInputStream.INSTANCE;
             }
         }
     }
@@ -78,12 +85,12 @@
     protected abstract PrimitiveColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info);
 
     @Override
-    protected void setPrimaryKeysAt(int index, int skipCount) throws HyracksDataException {
-        for (int i = 0; i < primaryKeyReaders.length; i++) {
-            PrimitiveColumnValuesReader reader = primaryKeyReaders[i];
-            reader.reset(index, skipCount);
-            primaryKeys[i].set(primaryKeysValueGetters[i].getValue(reader));
+    protected int setPrimaryKeysAt(int index, int skipCount) throws HyracksDataException {
+        int numberOfSkippedAntiMatters = resetPrimaryKeyReader(0, index, skipCount);
+        for (int i = 1; i < primaryKeyReaders.length; i++) {
+            resetPrimaryKeyReader(i, index, skipCount);
         }
+        return skipCount - numberOfSkippedAntiMatters;
     }
 
     @Override
@@ -228,4 +235,22 @@
             primaryKeys[i].set(primaryKeysValueGetters[i].getValue(reader));
         }
     }
+
+    protected void appendExceptionInformation(ColumnarValueException e) {
+        ObjectNode node = e.createNode(getClass().getSimpleName());
+        node.put("isAntiMatter", isAntimatter());
+        ArrayNode pkNodes = node.putArray("primaryKeyReaders");
+        for (IColumnValuesReader reader : primaryKeyReaders) {
+            reader.appendReaderInformation(pkNodes.addObject());
+        }
+    }
+
+    private int resetPrimaryKeyReader(int i, int index, int skipCount) throws HyracksDataException {
+        PrimitiveColumnValuesReader reader = primaryKeyReaders[i];
+        // Returns the number of encountered anti-matters
+        int numberOfSkippedAntiMatters = reader.reset(index, skipCount);
+        primaryKeys[i].set(primaryKeysValueGetters[i].getValue(reader));
+        // include the current key if the current key is an anti-matter
+        return numberOfSkippedAntiMatters + (isAntimatter() ? 1 : 0);
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java
index 32d25e3..c9b1eac 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java
@@ -41,6 +41,7 @@
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.error.ColumnarValueException;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
 
 public final class QueryColumnTupleReference extends AbstractAsterixColumnTupleReference {
@@ -123,7 +124,13 @@
     }
 
     public IValueReference getAssembledValue() throws HyracksDataException {
-        return filterApplier.getTuple();
+        try {
+            return filterApplier.getTuple();
+        } catch (ColumnarValueException e) {
+            appendExceptionInformation(e);
+            throw e;
+        }
+
     }
 
     private IFilterApplier createFilterApplier() {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java
index e2858ae..ea3b4b7 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java
@@ -42,6 +42,7 @@
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.error.ColumnarValueException;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
 
 public final class QueryColumnWithMetaTupleReference extends AbstractAsterixColumnTupleReference {
@@ -133,11 +134,21 @@
     }
 
     public IValueReference getAssembledValue() throws HyracksDataException {
-        return filterApplier.getTuple();
+        try {
+            return filterApplier.getTuple();
+        } catch (ColumnarValueException e) {
+            appendExceptionInformation(e);
+            throw e;
+        }
     }
 
     public IValueReference getMetaAssembledValue() throws HyracksDataException {
-        return metaAssembler.nextValue();
+        try {
+            return metaAssembler.nextValue();
+        } catch (ColumnarValueException e) {
+            appendExceptionInformation(e);
+            throw e;
+        }
     }
 
     private IFilterApplier createFilterApplier() {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnKeyValueReader.java
index 4ee2780..fc5baf7 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnKeyValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnKeyValueReader.java
@@ -29,8 +29,10 @@
      * Reset the reader at the given index
      *
      * @param startIndex start index
+     * @param skipCount  number of values should be skipped
+     * @return return the number of encountered anti-matters
      */
-    void reset(int startIndex, int skipCount) throws HyracksDataException;
+    int reset(int startIndex, int skipCount) throws HyracksDataException;
 
     /**
      * Returns the value of the key at the given index
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
index d4e6099..2e2aa9e 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
@@ -98,6 +98,12 @@
     int getEstimatedSize();
 
     /**
+     * @param length the length of value to be return
+     * @return (probably) an overestimated size needed to write a value with the given length
+     */
+    int getEstimatedSize(int length);
+
+    /**
      * @return the allocated space in bytes
      */
     int getAllocatedSpace();
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
index 7fd206a..021b9f5 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
@@ -41,6 +41,7 @@
     protected final int maxLevel;
     protected final ParquetRunLengthBitPackingHybridDecoder definitionLevels;
     protected final AbstractBytesInputStream valuesStream;
+    private final boolean primaryKey;
     protected int level;
     protected int valueCount;
     protected int valueIndex;
@@ -49,7 +50,7 @@
     private boolean nullLevel;
     private boolean allMissing;
 
-    // Logging members
+    // For logging purposes only
     private int numberOfEncounteredMissing;
     private int numberOfEncounteredNull;
 
@@ -59,6 +60,7 @@
         this.maxLevel = maxLevel;
         definitionLevels = new ParquetRunLengthBitPackingHybridDecoder(ColumnValuesUtil.getBitWidth(maxLevel));
         valuesStream = primaryKey ? new ByteBufferInputStream() : new MultiByteBufferInputStream();
+        this.primaryKey = primaryKey;
     }
 
     final void nextLevel() throws HyracksDataException {
@@ -72,6 +74,8 @@
         nullLevel = ColumnValuesUtil.isNull(nullBitMask, actualLevel);
         //Clear the null bit to allow repeated value readers determine the correct delimiter for null values
         level = ColumnValuesUtil.clearNullBit(nullBitMask, actualLevel);
+
+        // For logging purposes only
         numberOfEncounteredMissing += isMissing() ? 1 : 0;
         numberOfEncounteredNull += isNull() ? 1 : 0;
     }
@@ -170,6 +174,23 @@
 
     @Override
     public void skip(int count) throws HyracksDataException {
+        if (primaryKey) {
+            /*
+             * Do not modify the position of primary key (PK) reader as it is maintained by the cursor.
+             * Previously, we used two separate primary key readers
+             * 1- One for the cursor
+             *   - Its position maintained by the cursor
+             * 2- And one for assembler (if the primary key is requested -- like in SELECT *)
+             *   - Its position maintained by calling this skip method
+             * In the previous approach, maintaining the positions of two primary key(s) readers were messy,
+             * as we needed to re-sync the assembler reader with the cursor PK reader. The reason is that
+             * anti-matters are handled at the cursor level. When anti-matters are processed, they are skipped --
+             * making the assembler PK reader out of sync.
+             *
+             * Additionally, maintaining two readers that are decoding the same values (twice) is unnecessary.
+             */
+            return;
+        }
         for (int i = 0; i < count; i++) {
             next();
         }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java
index c6d2b1e..5f28995 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java
@@ -116,12 +116,17 @@
     }
 
     @Override
-    public void reset(int startIndex, int skipCount) throws HyracksDataException {
+    public int reset(int startIndex, int skipCount) throws HyracksDataException {
         ((IColumnKeyValueReader) valueReader).reset(startIndex, skipCount);
+        // first item
         nextLevel();
-        for (int i = 1; i < skipCount; i++) {
+        int numberOfAntiMatters = level < maxLevel ? 1 : 0;
+        for (int i = 0; i < skipCount; i++) {
+            // we should skip to index+=skipCount
             nextLevel();
+            numberOfAntiMatters += level < maxLevel ? 1 : 0;
         }
+        return numberOfAntiMatters;
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/AbstractFixedLengthColumnKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/AbstractFixedLengthColumnKeyValueReader.java
index fc458f3..28415f8 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/AbstractFixedLengthColumnKeyValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/AbstractFixedLengthColumnKeyValueReader.java
@@ -45,8 +45,9 @@
     }
 
     @Override
-    public void reset(int startIndex, int skipCount) {
+    public int reset(int startIndex, int skipCount) {
         getValue(startIndex);
+        return 0;
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/StringKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/StringKeyValueReader.java
index d6d7c4c..17225b9 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/StringKeyValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/StringKeyValueReader.java
@@ -51,8 +51,9 @@
     }
 
     @Override
-    public void reset(int startIndex, int skipCount) {
+    public int reset(int startIndex, int skipCount) {
         getValue(startIndex);
+        return 0;
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
index 87eda82..5e5d6e4 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
@@ -38,6 +38,8 @@
 import org.apache.parquet.bytes.BytesUtils;
 
 public abstract class AbstractColumnValuesWriter implements IColumnValuesWriter {
+    // For 3 integers (count, defSize, and valueSize)
+    private static final int COUNT_DEF_SIZE_VALUE_SIZE = Integer.BYTES * 3;
     protected final AbstractColumnFilterWriter filterWriter;
     protected final ParquetRunLengthBitPackingHybridEncoder definitionLevels;
     protected final int level;
@@ -65,7 +67,13 @@
 
     @Override
     public final int getEstimatedSize() {
-        return definitionLevels.getEstimatedSize() + getValuesEstimatedSize();
+        return definitionLevels.getEstimatedSize() + getValuesEstimatedSize() + COUNT_DEF_SIZE_VALUE_SIZE;
+    }
+
+    @Override
+    public int getEstimatedSize(int length) {
+        // + 1 byte for the definition level
+        return Byte.BYTES + calculateEstimatedSize(length);
     }
 
     @Override
@@ -234,6 +242,8 @@
 
     protected abstract int getValuesEstimatedSize();
 
+    protected abstract int calculateEstimatedSize(int length);
+
     protected abstract int getValuesAllocatedSize();
 
     protected abstract AbstractColumnFilterWriter createFilter();
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
index 0058f18..4bd6e27 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
@@ -59,6 +59,11 @@
     }
 
     @Override
+    protected int calculateEstimatedSize(int length) {
+        return 1;
+    }
+
+    @Override
     protected int getValuesAllocatedSize() {
         return booleanWriter.getAllocatedSize();
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
index 5963d23..9e6f906 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
@@ -87,6 +87,11 @@
     }
 
     @Override
+    protected int calculateEstimatedSize(int length) {
+        return doubleWriter.calculateEstimatedSize(length);
+    }
+
+    @Override
     protected int getValuesAllocatedSize() {
         return doubleWriter.getAllocatedSize();
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
index e6ada55..6e41af7 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
@@ -84,6 +84,11 @@
     }
 
     @Override
+    protected int calculateEstimatedSize(int length) {
+        return longWriter.calculateEstimatedSize(length);
+    }
+
+    @Override
     protected int getValuesAllocatedSize() {
         return longWriter.getAllocatedSize();
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
index edc9fe2..2d9f5bf 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
@@ -76,6 +76,11 @@
     }
 
     @Override
+    protected int calculateEstimatedSize(int length) {
+        return 0;
+    }
+
+    @Override
     protected int getValuesAllocatedSize() {
         return 0;
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
index b0d5a93..5b1977f 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
@@ -72,6 +72,11 @@
     }
 
     @Override
+    protected int calculateEstimatedSize(int length) {
+        return stringWriter.calculateEstimatedSize(length);
+    }
+
+    @Override
     protected final int getValuesAllocatedSize() {
         return stringWriter.getAllocatedSize();
     }
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
index fcb228f..f336103 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
@@ -31,7 +31,7 @@
 
 import org.apache.asterix.column.assembler.value.DummyValueGetterFactory;
 import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
-import org.apache.asterix.column.common.buffer.DummyBytesInputStream;
+import org.apache.asterix.column.bytes.stream.in.DummyBytesInputStream;
 import org.apache.asterix.column.common.test.TestCase;
 import org.apache.asterix.column.filter.NoOpColumnFilterEvaluatorFactory;
 import org.apache.asterix.column.operation.query.ColumnAssembler;
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java
index 2e1dec9..d287417 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java
@@ -109,6 +109,11 @@
     }
 
     @Override
+    public int getEstimatedSize(int length) {
+        return length;
+    }
+
+    @Override
     public int getAllocatedSpace() {
         return 0;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/error/ColumnarValueException.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/error/ColumnarValueException.java
index 476e1a1..79c7864 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/error/ColumnarValueException.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/error/ColumnarValueException.java
@@ -23,7 +23,7 @@
 
 /**
  * This exception is thrown when an error is encountered in columnar collections
- * It contains a string builder to collect information about the encountered error
+ * It contains an OBJECT_MAPPER to collect information about the encountered error in JSON format
  */
 public class ColumnarValueException extends IllegalStateException {
     private static final long serialVersionUID = 1513428477557736034L;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTreeRangeSearchCursor.java
index 75b2c72..be5837f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTreeRangeSearchCursor.java
@@ -34,8 +34,11 @@
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.IIndexCursorStats;
 import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public class LSMColumnBTreeRangeSearchCursor extends LSMBTreeRangeSearchCursor {
+    private static final Logger LOGGER = LogManager.getLogger();
     private final List<IColumnTupleIterator> componentTupleList;
 
     public LSMColumnBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx) {
@@ -75,7 +78,10 @@
             return;
         }
         IColumnTupleIterator columnTuple = (IColumnTupleIterator) e.getTuple();
-        columnTuple.skip(1);
+        if (!columnTuple.isAntimatter()) {
+            // Skip non-key columns
+            columnTuple.skip(1);
+        }
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
index d3b4673..0ac8c75 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
@@ -82,7 +82,7 @@
             if (columnIndex >= numberOfPrimaryKeys) {
                 buffersProviders[i] = new ColumnMultiBufferProvider(columnIndex, multiPageOp);
             } else {
-                buffersProviders[i] = new ColumnSingleBufferProvider(columnIndex);
+                buffersProviders[i] = DummyColumnBufferProvider.INSTANCE;
             }
         }
         totalNumberOfMegaLeafNodes = 0;
@@ -114,7 +114,12 @@
         int numberOfTuples = frame.getTupleCount();
         //Start new page and check whether we should skip reading non-key columns or not
         boolean readColumnPages = startNewPage(pageZero, frame.getNumberOfColumns(), numberOfTuples);
-        setPrimaryKeysAt(startIndex, startIndex);
+        /*
+         * When startIndex = 0, a call to next() is performed to get the information of the PK
+         * and 0 skips will be performed. If startIndex (for example) is 5, a call to next() will be performed
+         * then 4 skips will be performed.
+         */
+        int skipCount = setPrimaryKeysAt(startIndex, startIndex);
         if (readColumnPages) {
             for (int i = 0; i < filterBufferProviders.length; i++) {
                 IColumnBufferProvider provider = filterBufferProviders[i];
@@ -133,8 +138,13 @@
                 provider.reset(frame);
                 startColumn(provider, i, numberOfTuples);
             }
-            // Skip until before startIndex (i.e. stop at startIndex - 1)
-            skip(startIndex);
+            /*
+             * skipCount can be < 0 for cases when the tuples in the range [0, startIndex] are all anti-matters.
+             * Consequently, tuples in the range [0, startIndex] do not have any non-key columns. Thus, the returned
+             * skipCount from calling setPrimaryKeysAt(startIndex, startIndex) is a negative value. For that reason,
+             * non-key column should not skip any value.
+             */
+            skip(Math.max(skipCount, 0));
         } else {
             numOfSkippedMegaLeafNodes++;
         }
@@ -143,14 +153,25 @@
 
     @Override
     public final void setAt(int startIndex) throws HyracksDataException {
-        int skipCount = startIndex - tupleIndex;
+        /*
+         * Let say that tupleIndex = 5 and startIndex = 12
+         * Then, skipCount = 12 - 5 - 1 = 6.
+         */
+        int skipCount = startIndex - tupleIndex - 1;
         tupleIndex = startIndex;
-        setPrimaryKeysAt(startIndex, skipCount);
-        // -1 because next would be called for all columns
-        skip(skipCount - 1);
+        /*
+         * As in reset(int startIndex, int endIndex) above, a call to next() will be performed followed by 6 skips.
+         * So, the reader will be moved forward 7 positions (5 + 7 = 12). Hence, the PK will be exactly at index 12.
+         */
+        skipCount = setPrimaryKeysAt(startIndex, skipCount);
+        /*
+         * For values, we need to do 6 skips, as next will be called later by the assembler
+         * -- setting the position at 12 as well.
+         */
+        skip(skipCount);
     }
 
-    protected abstract void setPrimaryKeysAt(int index, int skipCount) throws HyracksDataException;
+    protected abstract int setPrimaryKeysAt(int index, int skipCount) throws HyracksDataException;
 
     protected abstract boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples)
             throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/DummyColumnBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/DummyColumnBufferProvider.java
new file mode 100644
index 0000000..f99ed21
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/DummyColumnBufferProvider.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples;
+
+import java.nio.ByteBuffer;
+import java.util.Queue;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+
+public class DummyColumnBufferProvider implements IColumnBufferProvider {
+    public static final IColumnBufferProvider INSTANCE = new DummyColumnBufferProvider();
+
+    private DummyColumnBufferProvider() {
+    }
+
+    @Override
+    public void reset(ColumnBTreeReadLeafFrame frame) throws HyracksDataException {
+
+    }
+
+    @Override
+    public void readAll(Queue<ByteBuffer> buffers) throws HyracksDataException {
+
+    }
+
+    @Override
+    public void releaseAll() throws HyracksDataException {
+
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return null;
+    }
+
+    @Override
+    public int getLength() {
+        return 0;
+    }
+
+    @Override
+    public int getColumnIndex() {
+        return 0;
+    }
+}