[ASTERIXDB-3165][STO] Avoid encoding primary keys

- user model changes: no
- storage format changes: yes
- interface changes: no

Details:
This patch removes the encoding for PKs to
allow for more efficient point lookups --
especially for upserting with secondary indexes.

Storage changes:
PKs are no longer encoded.

Change-Id: Id361d0d41b54a7ea84ec9212506809f5e0befc84
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17486
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java
index 9f1809d..5e2ef28 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java
@@ -39,10 +39,8 @@
         this.reader = reader;
     }
 
-    public final void reset(AbstractBytesInputStream in, int startIndex, int numberOfTuples)
-            throws HyracksDataException {
+    public final void reset(AbstractBytesInputStream in, int numberOfTuples) throws HyracksDataException {
         reader.reset(in, numberOfTuples);
-        reader.skip(startIndex);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDoublePlainValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDoublePlainValuesReader.java
deleted file mode 100644
index 196bec2..0000000
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDoublePlainValuesReader.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.column.bytes.decoder;
-
-import java.io.IOException;
-
-import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
-import org.apache.parquet.bytes.LittleEndianDataInputStream;
-import org.apache.parquet.io.ParquetDecodingException;
-
-public class ParquetDoublePlainValuesReader extends AbstractParquetValuesReader {
-    private LittleEndianDataInputStream in;
-
-    @Override
-    public void initFromPage(AbstractBytesInputStream stream) throws IOException {
-        this.in = new LittleEndianDataInputStream(stream.remainingStream());
-    }
-
-    @Override
-    public void skip() {
-        try {
-            in.skipBytes(8);
-        } catch (IOException e) {
-            throw new ParquetDecodingException("could not skip double", e);
-        }
-    }
-
-    @Override
-    public double readDouble() {
-        try {
-            return in.readDouble();
-        } catch (IOException e) {
-            throw new ParquetDecodingException("could not read double", e);
-        }
-    }
-}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetPlainFixedLengthValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetPlainFixedLengthValuesReader.java
new file mode 100644
index 0000000..07713e1
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetPlainFixedLengthValuesReader.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.decoder;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.column.bytes.stream.in.ValueInputStream;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.io.ParquetDecodingException;
+
+public class ParquetPlainFixedLengthValuesReader extends AbstractParquetValuesReader {
+    private final ValueInputStream in;
+    private final int valueLength;
+    private final IPointable valueStorage;
+
+    public ParquetPlainFixedLengthValuesReader(int valueLength) {
+        in = new ValueInputStream();
+        this.valueLength = valueLength;
+        valueStorage = null;
+    }
+
+    public ParquetPlainFixedLengthValuesReader(IPointable valueStorage) {
+        in = new ValueInputStream();
+        this.valueLength = valueStorage.getByteArray().length;
+        this.valueStorage = valueStorage;
+    }
+
+    @Override
+    public void initFromPage(AbstractBytesInputStream stream) throws EOFException {
+        in.reset(stream.remainingStream());
+    }
+
+    @Override
+    public void skip() {
+        try {
+            in.skipBytes(valueLength);
+        } catch (IOException e) {
+            throw new ParquetDecodingException("could not skip double", e);
+        }
+    }
+
+    @Override
+    public long readLong() {
+        try {
+            return in.readLong();
+        } catch (IOException e) {
+            throw new ParquetDecodingException("could not read double", e);
+        }
+    }
+
+    @Override
+    public double readDouble() {
+        try {
+            return in.readDouble();
+        } catch (IOException e) {
+            throw new ParquetDecodingException("could not read double", e);
+        }
+    }
+
+    @Override
+    public IValueReference readBytes() {
+        try {
+            return in.readBytes(valueStorage, valueLength);
+        } catch (IOException e) {
+            throw new ParquetDecodingException("could not read bytes", e);
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java
index 1b46116..4b09e4d 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java
@@ -91,6 +91,8 @@
         int start = value.getStartOffset();
         int length = value.getLength();
         if (skipLengthBytes) {
+            // Length bytes are skipped so the prefix encoding works properly (e.g., "123", "1234")
+            // the prefix "123" is a common substring between the two; however, their lengths are not
             int lengthBytes = UTF8StringUtil.getNumBytesToStoreLength(bytes, start);
             start += lengthBytes;
             length -= lengthBytes;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java
similarity index 65%
rename from asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java
rename to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java
index 0298e59..2aba7d2 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java
@@ -19,31 +19,46 @@
 package org.apache.asterix.column.bytes.encoder;
 
 import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
 
 import org.apache.asterix.column.bytes.stream.out.AbstractBytesOutputStream;
 import org.apache.asterix.column.bytes.stream.out.MultiTemporaryBufferBytesOutputStream;
+import org.apache.asterix.column.bytes.stream.out.ValueOutputStream;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
 import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.bytes.LittleEndianDataOutputStream;
 import org.apache.parquet.column.values.plain.PlainValuesWriter;
 import org.apache.parquet.io.ParquetEncodingException;
 
 /**
  * Re-implementation of {@link PlainValuesWriter}
  */
-public class ParquetPlainValuesWriter extends AbstractParquetValuesWriter {
-    public static final Charset CHARSET = StandardCharsets.UTF_8;
-
+public class ParquetPlainFixedLengthValuesWriter extends AbstractParquetValuesWriter {
     private final AbstractBytesOutputStream outputStream;
-    private final LittleEndianDataOutputStream out;
+    private final ValueOutputStream out;
 
-    public ParquetPlainValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+    public ParquetPlainFixedLengthValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
         outputStream = new MultiTemporaryBufferBytesOutputStream(multiPageOpRef);
-        out = new LittleEndianDataOutputStream(outputStream);
+        out = new ValueOutputStream(outputStream);
+    }
+
+    @Override
+    public void writeInteger(int v) {
+        try {
+            out.writeInt(v);
+        } catch (IOException e) {
+            throw new ParquetEncodingException("could not write int", e);
+        }
+    }
+
+    @Override
+    public void writeLong(long v) {
+        try {
+            out.writeLong(v);
+        } catch (IOException e) {
+            throw new ParquetEncodingException("could not write long", e);
+        }
     }
 
     @Override
@@ -55,6 +70,21 @@
         }
     }
 
+    /**
+     * Should only be used for UUID
+     *
+     * @param v               the value to encode
+     * @param skipLengthBytes ignored
+     */
+    @Override
+    public void writeBytes(IValueReference v, boolean skipLengthBytes) {
+        try {
+            out.write(v.getByteArray(), v.getStartOffset(), v.getLength());
+        } catch (IOException e) {
+            throw new ParquetEncodingException("could not write bytes", e);
+        }
+    }
+
     @Override
     public BytesInput getBytes() {
         try {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java
new file mode 100644
index 0000000..63697bc
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.out.AbstractBytesOutputStream;
+import org.apache.asterix.column.bytes.stream.out.GrowableBytesOutputStream;
+import org.apache.asterix.column.bytes.stream.out.MultiTemporaryBufferBytesOutputStream;
+import org.apache.asterix.column.bytes.stream.out.ValueOutputStream;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.io.ParquetEncodingException;
+
+public class ParquetPlainVariableLengthValuesWriter extends AbstractParquetValuesWriter {
+    private final GrowableBytesOutputStream offsetStream;
+    private final AbstractBytesOutputStream valueStream;
+    private final ValueOutputStream offsetWriterStream;
+
+    public ParquetPlainVariableLengthValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+        offsetStream = new GrowableBytesOutputStream();
+        valueStream = new MultiTemporaryBufferBytesOutputStream(multiPageOpRef);
+        offsetWriterStream = new ValueOutputStream(offsetStream);
+    }
+
+    @Override
+    public void writeBytes(IValueReference v, boolean skipLengthBytes) {
+        try {
+            offsetWriterStream.writeInt(valueStream.size());
+            valueStream.write(v);
+        } catch (IOException e) {
+            throw new ParquetEncodingException("could not write bytes", e);
+        }
+    }
+
+    @Override
+    public BytesInput getBytes() {
+        try {
+            offsetStream.flush();
+            valueStream.flush();
+        } catch (IOException e) {
+            throw new ParquetEncodingException("could not write page", e);
+        }
+        return BytesInput.concat(offsetStream.asBytesInput(), valueStream.asBytesInput());
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        offsetStream.reset();
+        valueStream.reset();
+    }
+
+    @Override
+    public void close() {
+        offsetStream.finish();
+        valueStream.finish();
+    }
+
+    @Override
+    public int getEstimatedSize() {
+        return offsetStream.size() + valueStream.size();
+    }
+
+    @Override
+    public int getAllocatedSize() {
+        return offsetStream.capacity() + valueStream.size();
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java
index b50143b..034df66 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java
@@ -61,6 +61,10 @@
     @Override
     public abstract int available();
 
+    public ByteBuffer getBuffer() {
+        throw new UnsupportedOperationException("Getting buffer is not supported");
+    }
+
     public final void skipFully(long n) throws IOException {
         long skipped = skip(n);
         if (skipped < n) {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java
index 833765c..9c3dc08 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java
@@ -166,4 +166,9 @@
     public int available() {
         return buffer.remaining();
     }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ValueInputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ValueInputStream.java
new file mode 100644
index 0000000..ee975f1
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ValueInputStream.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.in;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.parquet.bytes.LittleEndianDataInputStream;
+
+/**
+ * Re-implementation of {@link LittleEndianDataInputStream}
+ */
+public final class ValueInputStream extends InputStream {
+    private final byte[] readBuffer;
+    private InputStream in;
+
+    public ValueInputStream() {
+        readBuffer = new byte[8];
+    }
+
+    public void reset(AbstractBytesInputStream in) {
+        this.in = in;
+    }
+
+    @Override
+    public int read() throws IOException {
+        return in.read();
+    }
+
+    public int readInt() throws IOException {
+        readFully(readBuffer, Integer.BYTES);
+        return IntegerPointable.getInteger(readBuffer, 0);
+    }
+
+    public long readLong() throws IOException {
+        readFully(readBuffer, Long.BYTES);
+        return LongPointable.getLong(readBuffer, 0);
+    }
+
+    public double readDouble() throws IOException {
+        readFully(readBuffer, Double.BYTES);
+        return DoublePointable.getDouble(readBuffer, 0);
+    }
+
+    public IValueReference readBytes(IPointable valueStorage, int length) throws IOException {
+        readFully(valueStorage.getByteArray(), length);
+        return valueStorage;
+    }
+
+    public void skipBytes(int n) throws IOException {
+        int total = 0;
+        int cur;
+
+        while ((total < n) && ((cur = (int) in.skip(n - total)) > 0)) {
+            total += cur;
+        }
+    }
+
+    private void readFully(byte[] bytes, int len) throws IOException {
+        if (len < 0) {
+            throw new IndexOutOfBoundsException();
+        } else {
+            int count;
+            for (int n = 0; n < len; n += count) {
+                count = this.in.read(bytes, n, len - n);
+                if (count < 0) {
+                    throw new EOFException();
+                }
+            }
+
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ValueOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ValueOutputStream.java
new file mode 100644
index 0000000..a106a00
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ValueOutputStream.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+
+public final class ValueOutputStream extends OutputStream {
+    private final OutputStream out;
+    private final byte[] writeBuffer;
+
+    public ValueOutputStream(OutputStream out) {
+        this.out = out;
+        writeBuffer = new byte[8];
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        out.write(b);
+    }
+
+    public void writeInt(int value) throws IOException {
+        IntegerPointable.setInteger(writeBuffer, 0, value);
+        out.write(writeBuffer, 0, Integer.BYTES);
+    }
+
+    public void writeLong(long value) throws IOException {
+        LongPointable.setLong(writeBuffer, 0, value);
+        out.write(writeBuffer, 0, Long.BYTES);
+    }
+
+    public void writeDouble(double value) throws IOException {
+        DoublePointable.setDouble(writeBuffer, 0, value);
+        out.write(writeBuffer, 0, Double.BYTES);
+    }
+
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/ColumnAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/ColumnAssembler.java
index 2e60ee7..71d3ac6 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/ColumnAssembler.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/ColumnAssembler.java
@@ -51,8 +51,8 @@
         tupleIndex = 0;
     }
 
-    public void resetColumn(AbstractBytesInputStream stream, int startIndex, int ordinal) throws HyracksDataException {
-        assemblers.get(ordinal).reset(stream, startIndex, numberOfTuples);
+    public void resetColumn(AbstractBytesInputStream stream, int ordinal) throws HyracksDataException {
+        assemblers.get(ordinal).reset(stream, numberOfTuples);
     }
 
     public int getColumnIndex(int ordinal) {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
index f2407d3..b9babf1 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
@@ -31,8 +31,8 @@
 import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
 import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
 import org.apache.asterix.column.metadata.schema.visitor.SchemaClipperVisitor;
-import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.column.values.IColumnValuesReaderFactory;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
 import org.apache.asterix.column.values.reader.filter.FilterAccessorProvider;
 import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluator;
 import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluatorFactory;
@@ -57,14 +57,14 @@
 public class QueryColumnMetadata extends AbstractColumnImmutableReadMetadata {
     private static final Logger LOGGER = LogManager.getLogger();
     private final FieldNamesDictionary fieldNamesDictionary;
-    private final IColumnValuesReader[] primaryKeyReaders;
+    private final PrimitiveColumnValuesReader[] primaryKeyReaders;
     private final IColumnFilterEvaluator filterEvaluator;
     private final List<IColumnFilterValueAccessor> filterValueAccessors;
 
     protected final ColumnAssembler assembler;
 
     protected QueryColumnMetadata(ARecordType datasetType, ARecordType metaType,
-            IColumnValuesReader[] primaryKeyReaders, IValueReference serializedMetadata,
+            PrimitiveColumnValuesReader[] primaryKeyReaders, IValueReference serializedMetadata,
             FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, IColumnValuesReaderFactory readerFactory,
             IValueGetterFactory valueGetterFactory, IColumnFilterEvaluator filterEvaluator,
             List<IColumnFilterValueAccessor> filterValueAccessors) throws HyracksDataException {
@@ -84,7 +84,7 @@
         return fieldNamesDictionary;
     }
 
-    public final IColumnValuesReader[] getPrimaryKeyReaders() {
+    public final PrimitiveColumnValuesReader[] getPrimaryKeyReaders() {
         return primaryKeyReaders;
     }
 
@@ -169,7 +169,8 @@
         IColumnFilterEvaluator filterEvaluator = filterEvaluatorFactory.create(filterAccessorProvider);
         List<IColumnFilterValueAccessor> filterValueAccessors = filterAccessorProvider.getFilterAccessors();
 
-        IColumnValuesReader[] primaryKeyReaders = createPrimaryKeyReaders(input, readerFactory, numberOfPrimaryKeys);
+        PrimitiveColumnValuesReader[] primaryKeyReaders =
+                createPrimaryKeyReaders(input, readerFactory, numberOfPrimaryKeys);
 
         if (LOGGER.isInfoEnabled() && filterEvaluator != TrueColumnFilterEvaluator.INSTANCE) {
             String filterString = filterEvaluator == FalseColumnFilterEvaluator.INSTANCE ? "SKIP_ALL"
@@ -192,14 +193,14 @@
         return clippedRoot;
     }
 
-    protected static IColumnValuesReader[] createPrimaryKeyReaders(DataInput input,
+    protected static PrimitiveColumnValuesReader[] createPrimaryKeyReaders(DataInput input,
             IColumnValuesReaderFactory readerFactory, int numberOfPrimaryKeys) throws IOException {
         //skip number of columns
         input.readInt();
 
-        IColumnValuesReader[] primaryKeyReaders = new IColumnValuesReader[numberOfPrimaryKeys];
+        PrimitiveColumnValuesReader[] primaryKeyReaders = new PrimitiveColumnValuesReader[numberOfPrimaryKeys];
         for (int i = 0; i < numberOfPrimaryKeys; i++) {
-            primaryKeyReaders[i] = readerFactory.createValueReader(input);
+            primaryKeyReaders[i] = (PrimitiveColumnValuesReader) readerFactory.createValueReader(input);
         }
         return primaryKeyReaders;
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
index 6f0c974..2de1948 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
@@ -30,8 +30,8 @@
 import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
 import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
 import org.apache.asterix.column.metadata.schema.visitor.SchemaClipperVisitor;
-import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.column.values.IColumnValuesReaderFactory;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
 import org.apache.asterix.column.values.reader.filter.FilterAccessorProvider;
 import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluator;
 import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluatorFactory;
@@ -51,7 +51,7 @@
     private final ColumnAssembler metaAssembler;
 
     private QueryColumnWithMetaMetadata(ARecordType datasetType, ARecordType metaType,
-            IColumnValuesReader[] primaryKeyReaders, IValueReference serializedMetadata,
+            PrimitiveColumnValuesReader[] primaryKeyReaders, IValueReference serializedMetadata,
             FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, ObjectSchemaNode metaRoot,
             IColumnValuesReaderFactory readerFactory, IValueGetterFactory valueGetterFactory,
             IColumnFilterEvaluator filterEvaluator, List<IColumnFilterValueAccessor> filterValueAccessors)
@@ -141,7 +141,8 @@
         IColumnFilterEvaluator filterEvaluator = filterEvaluatorFactory.create(filterAccessorProvider);
         List<IColumnFilterValueAccessor> filterValueAccessors = filterAccessorProvider.getFilterAccessors();
 
-        IColumnValuesReader[] primaryKeyReaders = createPrimaryKeyReaders(input, readerFactory, numberOfPrimaryKeys);
+        PrimitiveColumnValuesReader[] primaryKeyReaders =
+                createPrimaryKeyReaders(input, readerFactory, numberOfPrimaryKeys);
 
         return new QueryColumnWithMetaMetadata(datasetType, metaType, primaryKeyReaders, serializedMetadata,
                 fieldNamesDictionary, clippedRoot, metaClippedRoot, readerFactory, valueGetterFactory, filterEvaluator,
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java
index df6b554..01e9742 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java
@@ -18,25 +18,33 @@
  */
 package org.apache.asterix.column.tuple;
 
+import static org.apache.hyracks.storage.am.common.frames.AbstractSlotManager.ERROR_INDICATOR;
+import static org.apache.hyracks.storage.am.common.frames.AbstractSlotManager.GREATEST_KEY_INDICATOR;
+
 import org.apache.asterix.column.assembler.value.IValueGetter;
 import org.apache.asterix.column.assembler.value.ValueGetterFactory;
 import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
 import org.apache.asterix.column.bytes.stream.in.ByteBufferInputStream;
 import org.apache.asterix.column.bytes.stream.in.MultiByteBufferInputStream;
 import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleMode;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples.AbstractColumnTupleReference;
+import org.apache.hyracks.storage.common.MultiComparator;
 
 public abstract class AbstractAsterixColumnTupleReference extends AbstractColumnTupleReference {
     private final IValueGetter[] primaryKeysValueGetters;
     protected final ByteBufferInputStream[] primaryKeyStreams;
-    protected final IColumnValuesReader[] primaryKeyReaders;
+    protected final PrimitiveColumnValuesReader[] primaryKeyReaders;
     protected final VoidPointable[] primaryKeys;
     protected final AbstractBytesInputStream[] columnStreams;
 
@@ -67,16 +75,24 @@
         }
     }
 
-    protected abstract IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info);
+    protected abstract PrimitiveColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info);
 
     @Override
-    protected final void startPrimaryKey(IColumnBufferProvider provider, int startIndex, int ordinal,
-            int numberOfTuples) throws HyracksDataException {
+    protected void setPrimaryKeysAt(int index, int skipCount) throws HyracksDataException {
+        for (int i = 0; i < primaryKeyReaders.length; i++) {
+            PrimitiveColumnValuesReader reader = primaryKeyReaders[i];
+            reader.reset(index, skipCount);
+            primaryKeys[i].set(primaryKeysValueGetters[i].getValue(reader));
+        }
+    }
+
+    @Override
+    protected final void startPrimaryKey(IColumnBufferProvider provider, int ordinal, int numberOfTuples)
+            throws HyracksDataException {
         ByteBufferInputStream primaryKeyStream = primaryKeyStreams[ordinal];
         primaryKeyStream.reset(provider);
         IColumnValuesReader reader = primaryKeyReaders[ordinal];
         reader.reset(primaryKeyStream, numberOfTuples);
-        reader.skip(startIndex);
     }
 
     @Override
@@ -137,4 +153,79 @@
         }
         return compare;
     }
+
+    @Override
+    public int findTupleIndex(ITupleReference searchKey, MultiComparator comparator, FindTupleMode mode,
+            FindTupleNoExactMatchPolicy matchPolicy) throws HyracksDataException {
+        int tupleCount = getTupleCount();
+        if (tupleCount <= 0) {
+            return GREATEST_KEY_INDICATOR;
+        }
+
+        int mid;
+        int begin = tupleIndex;
+        int end = tupleCount - 1;
+
+        while (begin <= end) {
+            mid = (begin + end) / 2;
+
+            setKeyAt(mid);
+            int cmp = comparator.compare(searchKey, this);
+            if (cmp < 0) {
+                end = mid - 1;
+            } else if (cmp > 0) {
+                begin = mid + 1;
+            } else {
+                if (mode == FindTupleMode.EXCLUSIVE) {
+                    if (matchPolicy == FindTupleNoExactMatchPolicy.HIGHER_KEY) {
+                        begin = mid + 1;
+                    } else {
+                        end = mid - 1;
+                    }
+                } else {
+                    if (mode == FindTupleMode.EXCLUSIVE_ERROR_IF_EXISTS) {
+                        return ERROR_INDICATOR;
+                    } else {
+                        return mid;
+                    }
+                }
+            }
+        }
+
+        if (mode == FindTupleMode.EXACT) {
+            return ERROR_INDICATOR;
+        }
+
+        if (matchPolicy == FindTupleNoExactMatchPolicy.HIGHER_KEY) {
+            if (begin > tupleCount - 1) {
+                return GREATEST_KEY_INDICATOR;
+            }
+
+            setKeyAt(begin);
+            if (comparator.compare(searchKey, this) < 0) {
+                return begin;
+            } else {
+                return GREATEST_KEY_INDICATOR;
+            }
+        } else {
+            if (end < 0) {
+                return GREATEST_KEY_INDICATOR;
+            }
+
+            setKeyAt(end);
+            if (comparator.compare(searchKey, this) > 0) {
+                return end;
+            } else {
+                return GREATEST_KEY_INDICATOR;
+            }
+        }
+    }
+
+    protected void setKeyAt(int index) {
+        for (int i = 0; i < primaryKeyReaders.length; i++) {
+            PrimitiveColumnValuesReader reader = primaryKeyReaders[i];
+            reader.getValue(index);
+            primaryKeys[i].set(primaryKeysValueGetters[i].getValue(reader));
+        }
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
index c10d415..56b0a57 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
@@ -24,6 +24,7 @@
 import org.apache.asterix.column.operation.lsm.merge.IEndOfPageCallBack;
 import org.apache.asterix.column.operation.lsm.merge.MergeColumnReadMetadata;
 import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
 import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
@@ -43,11 +44,14 @@
     }
 
     @Override
-    protected IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
+    protected PrimitiveColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
         MergeColumnReadMetadata columnMetadata = (MergeColumnReadMetadata) info;
         int numberOfPrimaryKeys = columnMetadata.getNumberOfPrimaryKeys();
-        IColumnValuesReader[] primaryKeyReaders = new IColumnValuesReader[numberOfPrimaryKeys];
-        System.arraycopy(columnMetadata.getColumnReaders(), 0, primaryKeyReaders, 0, numberOfPrimaryKeys);
+        PrimitiveColumnValuesReader[] primaryKeyReaders = new PrimitiveColumnValuesReader[numberOfPrimaryKeys];
+        IColumnValuesReader[] readers = columnMetadata.getColumnReaders();
+        for (int i = 0; i < numberOfPrimaryKeys; i++) {
+            primaryKeyReaders[i] = (PrimitiveColumnValuesReader) readers[i];
+        }
         return primaryKeyReaders;
     }
 
@@ -55,14 +59,15 @@
     protected boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples) {
         //Skip filters
         pageZero.position(pageZero.position() + numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE);
-        skipCount = 0;
+        // skip count is always start from zero as no "search" is conducted during a merge
+        this.skipCount = 0;
         return true;
     }
 
     @Override
-    protected void startColumn(IColumnBufferProvider buffersProvider, int startIndex, int ordinal, int numberOfTuples)
+    protected void startColumn(IColumnBufferProvider buffersProvider, int ordinal, int numberOfTuples)
             throws HyracksDataException {
-        int numberOfPrimaryKeys = primaryKeys.length;
+        int numberOfPrimaryKeys = primaryKeyStreams.length;
         if (ordinal < numberOfPrimaryKeys) {
             //Skip primary key
             return;
@@ -71,7 +76,6 @@
         columnStream.reset(buffersProvider);
         IColumnValuesReader reader = columnReaders[ordinal];
         reader.reset(columnStream, numberOfTuples);
-        reader.skip(startIndex);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java
index e286235..b70bddc 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java
@@ -24,7 +24,7 @@
 import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
 import org.apache.asterix.column.operation.query.ColumnAssembler;
 import org.apache.asterix.column.operation.query.QueryColumnMetadata;
-import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
 import org.apache.asterix.column.values.reader.filter.FilterAccessorProvider;
 import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluator;
 import org.apache.asterix.column.values.reader.filter.IColumnFilterValueAccessor;
@@ -50,7 +50,7 @@
     }
 
     @Override
-    protected IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
+    protected PrimitiveColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
         return ((QueryColumnMetadata) info).getPrimaryKeyReaders();
     }
 
@@ -69,11 +69,11 @@
     }
 
     @Override
-    protected void startColumn(IColumnBufferProvider buffersProvider, int startIndex, int ordinal, int numberOfTuples)
+    protected void startColumn(IColumnBufferProvider buffersProvider, int ordinal, int numberOfTuples)
             throws HyracksDataException {
         AbstractBytesInputStream columnStream = columnStreams[ordinal];
         columnStream.reset(buffersProvider);
-        assembler.resetColumn(columnStream, startIndex, ordinal);
+        assembler.resetColumn(columnStream, ordinal);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java
index a5cedc1..5b7794e 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java
@@ -25,7 +25,7 @@
 import org.apache.asterix.column.operation.query.ColumnAssembler;
 import org.apache.asterix.column.operation.query.QueryColumnMetadata;
 import org.apache.asterix.column.operation.query.QueryColumnWithMetaMetadata;
-import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
 import org.apache.asterix.column.values.reader.filter.FilterAccessorProvider;
 import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluator;
 import org.apache.asterix.column.values.reader.filter.IColumnFilterValueAccessor;
@@ -53,7 +53,7 @@
     }
 
     @Override
-    protected IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
+    protected PrimitiveColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
         return ((QueryColumnMetadata) info).getPrimaryKeyReaders();
     }
 
@@ -73,15 +73,15 @@
     }
 
     @Override
-    protected void startColumn(IColumnBufferProvider buffersProvider, int startIndex, int ordinal, int numberOfTuples)
+    protected void startColumn(IColumnBufferProvider buffersProvider, int ordinal, int numberOfTuples)
             throws HyracksDataException {
         AbstractBytesInputStream columnStream = columnStreams[ordinal];
         columnStream.reset(buffersProvider);
         int metaColumnCount = metaAssembler.getNumberOfColumns();
         if (ordinal >= metaColumnCount) {
-            assembler.resetColumn(columnStream, startIndex, ordinal - metaColumnCount);
+            assembler.resetColumn(columnStream, ordinal - metaColumnCount);
         } else {
-            metaAssembler.resetColumn(columnStream, startIndex, ordinal);
+            metaAssembler.resetColumn(columnStream, ordinal);
         }
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnKeyValueReader.java
new file mode 100644
index 0000000..4ee2780
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnKeyValueReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+/**
+ * Accessor for key columns
+ */
+public interface IColumnKeyValueReader {
+    /**
+     * Reset the reader at the given index
+     *
+     * @param startIndex start index
+     */
+    void reset(int startIndex, int skipCount) throws HyracksDataException;
+
+    /**
+     * Returns the value of the key at the given index
+     *
+     * @param index tuple index
+     * @return the key value
+     */
+    IValueReference getValue(int index);
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java
index 0f4cc0c..fe23b23 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java
@@ -104,7 +104,7 @@
     IValueReference getBytes();
 
     /* ***********************
-     * Write function
+     * Write functions
      * ***********************
      */
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
index c0cf18a..41dfea2 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
@@ -59,6 +59,8 @@
         if (allMissing) {
             return;
         }
+
+        valueIndex++;
         try {
             int actualLevel = definitionLevels.readInt();
             //Check whether the level is for a null value
@@ -90,7 +92,7 @@
             valuesStream.resetAt(defLevelsSize, in);
             int valueLength = BytesUtils.readZigZagVarInt(valuesStream);
             if (valueLength > 0) {
-                valueReader.resetValue(valuesStream);
+                valueReader.init(valuesStream, tupleCount);
             }
         } catch (IOException e) {
             throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java
index b233482..bf80580 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java
@@ -30,17 +30,21 @@
 import org.apache.asterix.column.values.reader.value.NoOpValueReader;
 import org.apache.asterix.column.values.reader.value.StringValueReader;
 import org.apache.asterix.column.values.reader.value.UUIDValueReader;
+import org.apache.asterix.column.values.reader.value.key.DoubleKeyValueReader;
+import org.apache.asterix.column.values.reader.value.key.LongKeyValueReader;
+import org.apache.asterix.column.values.reader.value.key.StringKeyValueReader;
+import org.apache.asterix.column.values.reader.value.key.UUIDKeyValueReader;
 import org.apache.asterix.om.types.ATypeTag;
 
 public class ColumnValueReaderFactory implements IColumnValuesReaderFactory {
     @Override
     public IColumnValuesReader createValueReader(ATypeTag typeTag, int columnIndex, int maxLevel, boolean primaryKey) {
-        return new PrimitiveColumnValuesReader(createReader(typeTag), columnIndex, maxLevel, primaryKey);
+        return new PrimitiveColumnValuesReader(createReader(typeTag, primaryKey), columnIndex, maxLevel, primaryKey);
     }
 
     @Override
     public IColumnValuesReader createValueReader(ATypeTag typeTag, int columnIndex, int maxLevel, int[] delimiters) {
-        return new RepeatedPrimitiveColumnValuesReader(createReader(typeTag), columnIndex, maxLevel, delimiters);
+        return new RepeatedPrimitiveColumnValuesReader(createReader(typeTag, false), columnIndex, maxLevel, delimiters);
     }
 
     @Override
@@ -60,7 +64,7 @@
         return createValueReader(typeTag, columnIndex, maxLevel, primaryKey);
     }
 
-    private AbstractValueReader createReader(ATypeTag typeTag) {
+    private AbstractValueReader createReader(ATypeTag typeTag, boolean primaryKey) {
         switch (typeTag) {
             case MISSING:
             case NULL:
@@ -68,13 +72,13 @@
             case BOOLEAN:
                 return new BooleanValueReader();
             case BIGINT:
-                return new LongValueReader();
+                return primaryKey ? new LongKeyValueReader() : new LongValueReader();
             case DOUBLE:
-                return new DoubleValueReader();
+                return primaryKey ? new DoubleKeyValueReader() : new DoubleValueReader();
             case STRING:
-                return new StringValueReader();
+                return primaryKey ? new StringKeyValueReader() : new StringValueReader();
             case UUID:
-                return new UUIDValueReader();
+                return primaryKey ? new UUIDKeyValueReader() : new UUIDValueReader();
             default:
                 throw new UnsupportedOperationException(typeTag + " is not supported");
         }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java
index e8c7bc5..7ae2954 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java
@@ -20,14 +20,16 @@
 
 import java.io.IOException;
 
+import org.apache.asterix.column.values.IColumnKeyValueReader;
 import org.apache.asterix.column.values.IColumnValuesWriter;
 import org.apache.asterix.column.values.reader.value.AbstractValueReader;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
 
 /**
  * Reader for a non-repeated primitive value
  */
-public final class PrimitiveColumnValuesReader extends AbstractColumnValuesReader {
+public final class PrimitiveColumnValuesReader extends AbstractColumnValuesReader implements IColumnKeyValueReader {
     /**
      * A primary key value is always present. Anti-matter can be determined by checking whether the definition level
      * indicates that the tuple's values are missing (i.e., by calling {@link #isMissing()}).
@@ -49,7 +51,6 @@
         if (valueIndex == valueCount) {
             return false;
         }
-        valueIndex++;
 
         try {
             nextLevel();
@@ -92,4 +93,18 @@
             }
         }
     }
+
+    @Override
+    public IValueReference getValue(int index) {
+        return ((IColumnKeyValueReader) valueReader).getValue(index);
+    }
+
+    @Override
+    public void reset(int startIndex, int skipCount) throws HyracksDataException {
+        ((IColumnKeyValueReader) valueReader).reset(startIndex, skipCount);
+        nextLevel();
+        for (int i = 1; i < skipCount; i++) {
+            nextLevel();
+        }
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java
index 673aa98..1cd424b 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java
@@ -66,7 +66,6 @@
         if (level == maxLevel) {
             valueReader.nextValue();
         }
-        valueIndex++;
         return true;
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java
index 3d4c744..4db082f 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java
@@ -27,7 +27,7 @@
 
 public abstract class AbstractValueReader implements Comparable<AbstractValueReader> {
 
-    public abstract void resetValue(AbstractBytesInputStream in) throws IOException;
+    public abstract void init(AbstractBytesInputStream in, int tupleCount) throws IOException;
 
     public abstract void nextValue() throws HyracksDataException;
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/BooleanValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/BooleanValueReader.java
index 3417773..6b5e0d4 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/BooleanValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/BooleanValueReader.java
@@ -34,7 +34,7 @@
     }
 
     @Override
-    public void resetValue(AbstractBytesInputStream in) {
+    public void init(AbstractBytesInputStream in, int tupleCount) {
         booleanReader.reset(in);
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java
index 24155f2..faa60d1 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java
@@ -20,20 +20,20 @@
 
 import java.io.IOException;
 
-import org.apache.asterix.column.bytes.decoder.ParquetDoublePlainValuesReader;
+import org.apache.asterix.column.bytes.decoder.ParquetPlainFixedLengthValuesReader;
 import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
 import org.apache.asterix.om.types.ATypeTag;
 
 public final class DoubleValueReader extends AbstractValueReader {
-    private final ParquetDoublePlainValuesReader doubleReader;
+    private final ParquetPlainFixedLengthValuesReader doubleReader;
     private double nextValue;
 
     public DoubleValueReader() {
-        doubleReader = new ParquetDoublePlainValuesReader();
+        doubleReader = new ParquetPlainFixedLengthValuesReader(Double.BYTES);
     }
 
     @Override
-    public void resetValue(AbstractBytesInputStream in) throws IOException {
+    public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
         doubleReader.initFromPage(in);
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java
index 09413d9..c22687e 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java
@@ -33,7 +33,7 @@
     }
 
     @Override
-    public void resetValue(AbstractBytesInputStream in) throws IOException {
+    public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
         longReader.initFromPage(in);
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/NoOpValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/NoOpValueReader.java
index fd56ff2..1982c54 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/NoOpValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/NoOpValueReader.java
@@ -31,7 +31,7 @@
     }
 
     @Override
-    public void resetValue(AbstractBytesInputStream in) throws IOException {
+    public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
         throw new UnsupportedOperationException(getClass().getName());
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/StringValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/StringValueReader.java
index 8fd8874..19da3dd 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/StringValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/StringValueReader.java
@@ -35,7 +35,7 @@
     }
 
     @Override
-    public void resetValue(AbstractBytesInputStream in) throws IOException {
+    public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
         stringReader.initFromPage(in);
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java
index 4f240e9..7517960 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java
@@ -20,22 +20,24 @@
 
 import java.io.IOException;
 
-import org.apache.asterix.column.bytes.decoder.ParquetDeltaByteArrayReader;
+import org.apache.asterix.column.bytes.decoder.ParquetPlainFixedLengthValuesReader;
 import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
 import org.apache.asterix.dataflow.data.nontagged.comparators.AUUIDPartialBinaryComparatorFactory;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
 public final class UUIDValueReader extends AbstractValueReader {
-    private final ParquetDeltaByteArrayReader uuidReader;
+    private final ParquetPlainFixedLengthValuesReader uuidReader;
     private IValueReference nextValue;
 
     public UUIDValueReader() {
-        uuidReader = new ParquetDeltaByteArrayReader(false);
+        ArrayBackedValueStorage storage = new ArrayBackedValueStorage(16);
+        uuidReader = new ParquetPlainFixedLengthValuesReader(storage);
     }
 
     @Override
-    public void resetValue(AbstractBytesInputStream in) throws IOException {
+    public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
         uuidReader.initFromPage(in);
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/AbstractFixedLengthColumnKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/AbstractFixedLengthColumnKeyValueReader.java
new file mode 100644
index 0000000..fc458f3
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/AbstractFixedLengthColumnKeyValueReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value.key;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.column.values.IColumnKeyValueReader;
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+
+abstract class AbstractFixedLengthColumnKeyValueReader extends AbstractValueReader implements IColumnKeyValueReader {
+    protected final IPointable value;
+    private ByteBuffer buffer;
+    private int startOffset;
+
+    AbstractFixedLengthColumnKeyValueReader() {
+        value = new VoidPointable();
+    }
+
+    @Override
+    public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
+        buffer = in.getBuffer();
+        startOffset = buffer.position();
+        value.set(null, 0, 0);
+    }
+
+    @Override
+    public void reset(int startIndex, int skipCount) {
+        getValue(startIndex);
+    }
+
+    @Override
+    public IValueReference getValue(int index) {
+        int valueLength = getValueLength();
+        int offset = startOffset + index * valueLength;
+        value.set(buffer.array(), offset, valueLength);
+        return value;
+    }
+
+    @Override
+    public void nextValue() {
+        if (value.getByteArray() == null) {
+            getValue(0);
+            return;
+        }
+        int valueLength = getValueLength();
+        int offset = value.getStartOffset() + valueLength;
+        value.set(buffer.array(), offset, valueLength);
+    }
+
+    protected abstract int getValueLength();
+
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/DoubleKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/DoubleKeyValueReader.java
new file mode 100644
index 0000000..cfb8af4
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/DoubleKeyValueReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value.key;
+
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+
+public final class DoubleKeyValueReader extends AbstractFixedLengthColumnKeyValueReader {
+    @Override
+    public ATypeTag getTypeTag() {
+        return ATypeTag.DOUBLE;
+    }
+
+    @Override
+    protected int getValueLength() {
+        return Double.BYTES;
+    }
+
+    @Override
+    public double getDouble() {
+        return DoublePointable.getDouble(value.getByteArray(), value.getStartOffset());
+    }
+
+    @Override
+    public int compareTo(AbstractValueReader o) {
+        return Double.compare(getDouble(), o.getDouble());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/LongKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/LongKeyValueReader.java
new file mode 100644
index 0000000..a981dca
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/LongKeyValueReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value.key;
+
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+
+public final class LongKeyValueReader extends AbstractFixedLengthColumnKeyValueReader {
+    @Override
+    public ATypeTag getTypeTag() {
+        return ATypeTag.BIGINT;
+    }
+
+    @Override
+    protected int getValueLength() {
+        return Long.BYTES;
+    }
+
+    @Override
+    public long getLong() {
+        return LongPointable.getLong(value.getByteArray(), value.getStartOffset());
+    }
+
+    @Override
+    public int compareTo(AbstractValueReader o) {
+        return Long.compare(getLong(), o.getLong());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/StringKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/StringKeyValueReader.java
new file mode 100644
index 0000000..d6d7c4c
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/StringKeyValueReader.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value.key;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.column.values.IColumnKeyValueReader;
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+
+public final class StringKeyValueReader extends AbstractValueReader implements IColumnKeyValueReader {
+    private final IPointable value;
+    private ByteBuffer buffer;
+    private int startOffset;
+    private int tupleCount;
+
+    public StringKeyValueReader() {
+        value = new VoidPointable();
+    }
+
+    @Override
+    public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
+        buffer = in.getBuffer();
+        startOffset = buffer.position();
+        this.tupleCount = tupleCount;
+        value.set(null, 0, 0);
+    }
+
+    @Override
+    public void reset(int startIndex, int skipCount) {
+        getValue(startIndex);
+    }
+
+    @Override
+    public IValueReference getValue(int index) {
+        byte[] bytes = buffer.array();
+        int indexOffset = startOffset + index * Integer.BYTES;
+        int valueOffset = startOffset + tupleCount * Integer.BYTES + IntegerPointable.getInteger(bytes, indexOffset);
+        int valueLength = UTF8StringUtil.getUTFLength(bytes, valueOffset);
+        valueLength += UTF8StringUtil.getNumBytesToStoreLength(valueLength);
+        value.set(bytes, valueOffset, valueLength);
+        return value;
+    }
+
+    @Override
+    public IValueReference getBytes() {
+        return value;
+    }
+
+    @Override
+    public void nextValue() {
+        if (value.getByteArray() == null) {
+            getValue(0);
+            return;
+        }
+        int offset = value.getStartOffset() + value.getLength();
+        int length = UTF8StringUtil.getUTFLength(buffer.array(), offset);
+        length += UTF8StringUtil.getNumBytesToStoreLength(length);
+        value.set(buffer.array(), offset, length);
+    }
+
+    @Override
+    public ATypeTag getTypeTag() {
+        return ATypeTag.STRING;
+    }
+
+    @Override
+    public int compareTo(AbstractValueReader o) {
+        return UTF8StringPointable.compare(getBytes(), o.getBytes());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/UUIDKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/UUIDKeyValueReader.java
new file mode 100644
index 0000000..141e9f8
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/UUIDKeyValueReader.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value.key;
+
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
+import org.apache.asterix.dataflow.data.nontagged.comparators.AUUIDPartialBinaryComparatorFactory;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public final class UUIDKeyValueReader extends AbstractFixedLengthColumnKeyValueReader {
+    @Override
+    public ATypeTag getTypeTag() {
+        return ATypeTag.UUID;
+    }
+
+    @Override
+    protected int getValueLength() {
+        return 16;
+    }
+
+    @Override
+    public IValueReference getBytes() {
+        return value;
+    }
+
+    @Override
+    public int compareTo(AbstractValueReader o) {
+        return AUUIDPartialBinaryComparatorFactory.compare(getBytes(), o.getBytes());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
index ca5cbb1..5963d23 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
@@ -20,7 +20,7 @@
 
 import java.io.IOException;
 
-import org.apache.asterix.column.bytes.encoder.ParquetPlainValuesWriter;
+import org.apache.asterix.column.bytes.encoder.ParquetPlainFixedLengthValuesWriter;
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
 import org.apache.asterix.column.values.writer.filters.DoubleColumnFilterWriter;
@@ -37,12 +37,12 @@
 import org.apache.parquet.bytes.BytesInput;
 
 public final class DoubleColumnValuesWriter extends AbstractColumnValuesWriter {
-    private final ParquetPlainValuesWriter doubleWriter;
+    private final ParquetPlainFixedLengthValuesWriter doubleWriter;
 
     public DoubleColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
             boolean collection, boolean filtered) {
         super(columnIndex, level, collection, filtered);
-        doubleWriter = new ParquetPlainValuesWriter(multiPageOpRef);
+        doubleWriter = new ParquetPlainFixedLengthValuesWriter(multiPageOpRef);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
index e71ec73..e6ada55 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
@@ -20,7 +20,9 @@
 
 import java.io.IOException;
 
+import org.apache.asterix.column.bytes.encoder.AbstractParquetValuesWriter;
 import org.apache.asterix.column.bytes.encoder.ParquetDeltaBinaryPackingValuesWriterForLong;
+import org.apache.asterix.column.bytes.encoder.ParquetPlainFixedLengthValuesWriter;
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
 import org.apache.asterix.column.values.writer.filters.LongColumnFilterWriter;
@@ -35,12 +37,13 @@
 import org.apache.parquet.bytes.BytesInput;
 
 final class LongColumnValuesWriter extends AbstractColumnValuesWriter {
-    private final ParquetDeltaBinaryPackingValuesWriterForLong longWriter;
+    private final AbstractParquetValuesWriter longWriter;
 
     public LongColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
             boolean collection, boolean filtered) {
         super(columnIndex, level, collection, filtered);
-        longWriter = new ParquetDeltaBinaryPackingValuesWriterForLong(multiPageOpRef);
+        longWriter = !filtered ? new ParquetPlainFixedLengthValuesWriter(multiPageOpRef)
+                : new ParquetDeltaBinaryPackingValuesWriterForLong(multiPageOpRef);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
index e1a3ffd..b0d5a93 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
@@ -20,7 +20,9 @@
 
 import java.io.IOException;
 
+import org.apache.asterix.column.bytes.encoder.AbstractParquetValuesWriter;
 import org.apache.asterix.column.bytes.encoder.ParquetDeltaByteArrayWriter;
+import org.apache.asterix.column.bytes.encoder.ParquetPlainVariableLengthValuesWriter;
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
 import org.apache.asterix.column.values.writer.filters.StringColumnFilterWriter;
@@ -32,18 +34,19 @@
 import org.apache.parquet.bytes.BytesInput;
 
 public class StringColumnValuesWriter extends AbstractColumnValuesWriter {
-    private final ParquetDeltaByteArrayWriter stringWriter;
+    private final AbstractParquetValuesWriter stringWriter;
     private final boolean skipLengthBytes;
 
     public StringColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
             boolean collection, boolean filtered) {
-        this(multiPageOpRef, columnIndex, level, collection, filtered, true);
+        this(columnIndex, level, collection, filtered, true, filtered ? new ParquetDeltaByteArrayWriter(multiPageOpRef)
+                : new ParquetPlainVariableLengthValuesWriter(multiPageOpRef));
     }
 
-    protected StringColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
-            boolean collection, boolean filtered, boolean skipLengthBytes) {
+    protected StringColumnValuesWriter(int columnIndex, int level, boolean collection, boolean filtered,
+            boolean skipLengthBytes, AbstractParquetValuesWriter stringWriter) {
         super(columnIndex, level, collection, filtered);
-        stringWriter = new ParquetDeltaByteArrayWriter(multiPageOpRef);
+        this.stringWriter = stringWriter;
         this.skipLengthBytes = skipLengthBytes;
     }
 
@@ -94,4 +97,5 @@
     protected ATypeTag getTypeTag() {
         return ATypeTag.STRING;
     }
+
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
index 1e98754..9d4ff9a 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.column.values.writer;
 
+import org.apache.asterix.column.bytes.encoder.ParquetPlainFixedLengthValuesWriter;
 import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
 import org.apache.asterix.column.values.writer.filters.UUIDColumnFilterWriter;
 import org.apache.asterix.om.types.ATypeTag;
@@ -28,7 +29,8 @@
 
     public UUIDColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
             boolean collection, boolean filtered) {
-        super(multiPageOpRef, columnIndex, level, collection, filtered, false);
+        // UUID is always written without encoding
+        super(columnIndex, level, collection, filtered, false, new ParquetPlainFixedLengthValuesWriter(multiPageOpRef));
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index 03c14ae..36ebab9 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -271,7 +271,7 @@
                 LOGGER.info("READ PageZero {}", pageNumber++);
                 assembler.reset(prepareRead(pageZero, providers, streams));
                 for (int i = 0; i < streams.length; i++) {
-                    assembler.resetColumn(streams[i], 0, i);
+                    assembler.resetColumn(streams[i], i);
                 }
                 writeForPageZero(ps, assembler);
             }
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
index 8f53ebc..ff088d6 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
@@ -125,7 +125,7 @@
             Pair<PrintStream, ATypeTag> pair = new Pair<>(ps, ATypeTag.OBJECT);
             assembler.reset(numberOfTuples);
             for (int i = 0; i < columnMetadata.getNumberOfColumns(); i++) {
-                assembler.resetColumn(streams[i], 0, i);
+                assembler.resetColumn(streams[i], i);
             }
             while (assembler.hasNext()) {
                 IValueReference record = assembler.nextValue();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java
index 0c95500..4034906 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java
@@ -19,9 +19,13 @@
 package org.apache.hyracks.storage.am.lsm.btree.column.api;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleMode;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMTreeTupleReference;
+import org.apache.hyracks.storage.common.MultiComparator;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 
@@ -31,11 +35,38 @@
  */
 public interface IColumnTupleIterator extends ILSMTreeTupleReference, Comparable<IColumnTupleIterator> {
     /**
+     * Indicates a new page was set to prepare the iterator
+     */
+    void newPage() throws HyracksDataException;
+
+    /**
      * Reset the iterator starting at the provided index
      *
      * @param startIndex start from the tuple at this index
+     * @param endIndex   stop at this index (exclusive)
      */
-    void reset(int startIndex) throws HyracksDataException;
+    void reset(int startIndex, int endIndex) throws HyracksDataException;
+
+    /**
+     * Set the iterator at a new position
+     * NOTE:
+     * the new start index has to be greater than the current tuple index
+     *
+     * @param startIndex the new index to start from
+     */
+    void setAt(int startIndex) throws HyracksDataException;
+
+    /**
+     * Finds the tuple index given the search key
+     *
+     * @param searchKey search key
+     * @param cmp       comparator
+     * @param ftm       find tuple mode
+     * @param ftp       find tuple policy
+     * @return index of the tuple
+     */
+    int findTupleIndex(ITupleReference searchKey, MultiComparator cmp, FindTupleMode ftm,
+            FindTupleNoExactMatchPolicy ftp) throws HyracksDataException;
 
     /**
      * Mark {@link IColumnTupleIterator} as consumed
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreePointSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreePointSearchCursor.java
index 683e099..db878c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreePointSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreePointSearchCursor.java
@@ -20,7 +20,10 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.btree.api.IDiskBTreeStatefulPointSearchCursor;
+import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleMode;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
 import org.apache.hyracks.storage.common.IIndexCursorStats;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
@@ -56,7 +59,29 @@
 
     @Override
     public void setCursorToNextKey(ISearchPredicate searchPred) throws HyracksDataException {
-        initCursorPosition(searchPred);
+        int index = getLowKeyIndex();
+        if (index == frame.getTupleCount()) {
+            frameTuple.consume();
+            yieldFirstCall = false;
+            return;
+        }
+        frameTuple.setAt(index);
+        yieldFirstCall = true;
+    }
+
+    @Override
+    protected void setSearchPredicate(ISearchPredicate searchPred) {
+        pred = (RangePredicate) searchPred;
+        lowKey = pred.getLowKey();
+        lowKeyFtm = FindTupleMode.EXACT;
+        lowKeyFtp = FindTupleNoExactMatchPolicy.NONE;
+        reusablePredicate.setLowKeyComparator(originalKeyCmp);
+    }
+
+    @Override
+    protected int getLowKeyIndex() throws HyracksDataException {
+        int index = frameTuple.findTupleIndex(pred.getLowKey(), pred.getLowKeyComparator(), lowKeyFtm, lowKeyFtp);
+        return index < 0 ? frame.getTupleCount() : index;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
index 3323494..e618aaa 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
@@ -24,6 +24,8 @@
 import org.apache.hyracks.storage.am.btree.impls.BTreeCursorInitialState;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleMode;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
 import org.apache.hyracks.storage.common.EnforcedIndexCursor;
@@ -53,6 +55,11 @@
     protected RangePredicate pred;
     protected ITupleReference lowKey;
     protected ITupleReference highKey;
+
+    protected FindTupleMode lowKeyFtm;
+    protected FindTupleMode highKeyFtm;
+    protected FindTupleNoExactMatchPolicy lowKeyFtp;
+    protected FindTupleNoExactMatchPolicy highKeyFtp;
     protected boolean yieldFirstCall;
 
     protected final IIndexCursorStats stats;
@@ -67,7 +74,7 @@
     }
 
     @Override
-    public void doDestroy() throws HyracksDataException {
+    public void doDestroy() {
         // No Op all resources are released in the close call
     }
 
@@ -84,7 +91,8 @@
             bufferCache.unpin(page0);
             page0 = nextLeaf;
             frame.setPage(page0);
-            frameTuple.reset(0);
+            frameTuple.newPage();
+            setCursorPosition();
             nextLeafPage = frame.getNextLeaf();
         } while (frame.getTupleCount() == 0 && nextLeafPage > 0);
     }
@@ -120,20 +128,28 @@
         pageId = ((BTreeCursorInitialState) initialState).getPageId();
         frame.setPage(page0);
         frame.setMultiComparator(originalKeyCmp);
-        frameTuple.reset(0);
+        frameTuple.newPage();
         initCursorPosition(searchPred);
     }
 
     protected void initCursorPosition(ISearchPredicate searchPred) throws HyracksDataException {
-        pred = (RangePredicate) searchPred;
-        lowKey = pred.getLowKey();
-        highKey = pred.getHighKey();
-
+        setSearchPredicate(searchPred);
         reusablePredicate.setLowKeyComparator(originalKeyCmp);
         reusablePredicate.setHighKeyComparator(pred.getHighKeyComparator());
         reusablePredicate.setHighKey(pred.getHighKey(), pred.isHighKeyInclusive());
         yieldFirstCall = false;
-        advanceTupleToLowKey();
+        setCursorPosition();
+    }
+
+    private void setCursorPosition() throws HyracksDataException {
+        int start = getLowKeyIndex();
+        int end = getHighKeyIndex();
+        if (end < start) {
+            frameTuple.consume();
+            return;
+        }
+        frameTuple.reset(start, end);
+        yieldFirstCall = shouldYieldFirstCall();
     }
 
     protected boolean isNextIncluded() throws HyracksDataException {
@@ -151,41 +167,6 @@
         return highKey == null || isLessOrEqual(frameTuple, highKey, pred.isHighKeyInclusive());
     }
 
-    protected void advanceTupleToLowKey() throws HyracksDataException {
-        if (highKey != null && isLessOrEqual(highKey, frame.getLeftmostTuple(), !pred.isHighKeyInclusive())
-                || lowKey != null && isLessOrEqual(frame.getRightmostTuple(), lowKey, !pred.isLowKeyInclusive())) {
-            /*
-             * If
-             * - The Lowest key from the frame is greater than the requested highKey
-             * OR
-             * - The highest key from the frame is less than the requested lowKey
-             * Then:
-             * No tuple will satisfy the search key. Consume the frameTuple to stop the search.
-             */
-            frameTuple.consume();
-            return;
-        } else if (lowKey == null) {
-            // no lowKey was specified, start from tupleIndex = 0
-            return;
-        }
-
-        //The requested key is somewhere within the frame tuples
-        boolean stop = false;
-        int counter = 0;
-        while (!stop && !frameTuple.isConsumed()) {
-            frameTuple.next();
-            stop = isLessOrEqual(lowKey, frameTuple, pred.isLowKeyInclusive());
-            counter++;
-        }
-
-        // Only proceed if needed
-        yieldFirstCall = shouldYieldFirstCall();
-        // Advance all columns to the proper position if needed
-        if (yieldFirstCall && counter - 1 > 0) {
-            frameTuple.skip(counter - 1);
-        }
-    }
-
     protected boolean shouldYieldFirstCall() throws HyracksDataException {
         // Proceed if the highKey is null or the current tuple's key is less than (or equal) the highKey
         return highKey == null || isLessOrEqual(frameTuple, highKey, pred.isHighKeyInclusive());
@@ -205,6 +186,65 @@
         return cmp < 0 || inclusive && cmp == 0;
     }
 
+    protected int getLowKeyIndex() throws HyracksDataException {
+        if (lowKey == null) {
+            return 0;
+        } else if (isLessOrEqual(frame.getRightmostTuple(), lowKey, !pred.isLowKeyInclusive())) {
+            //The highest key from the frame is less than the requested lowKey
+            return frame.getTupleCount();
+        }
+
+        int index = frameTuple.findTupleIndex(lowKey, pred.getLowKeyComparator(), lowKeyFtm, lowKeyFtp);
+        if (pred.isLowKeyInclusive()) {
+            index++;
+        } else {
+            if (index < 0) {
+                index = frame.getTupleCount();
+            }
+        }
+
+        return index;
+    }
+
+    protected int getHighKeyIndex() throws HyracksDataException {
+        if (highKey == null) {
+            return frame.getTupleCount() - 1;
+        } else if (isLessOrEqual(highKey, frame.getLeftmostTuple(), !pred.isHighKeyInclusive())) {
+            return -1;
+        }
+
+        int index = frameTuple.findTupleIndex(highKey, pred.getHighKeyComparator(), highKeyFtm, highKeyFtp);
+        if (pred.isHighKeyInclusive()) {
+            if (index < 0) {
+                index = frame.getTupleCount() - 1;
+            } else {
+                index--;
+            }
+        }
+
+        return index;
+    }
+
+    protected void setSearchPredicate(ISearchPredicate searchPred) {
+        pred = (RangePredicate) searchPred;
+        lowKey = pred.getLowKey();
+        highKey = pred.getHighKey();
+
+        lowKeyFtm = FindTupleMode.EXCLUSIVE;
+        if (pred.isLowKeyInclusive()) {
+            lowKeyFtp = FindTupleNoExactMatchPolicy.LOWER_KEY;
+        } else {
+            lowKeyFtp = FindTupleNoExactMatchPolicy.HIGHER_KEY;
+        }
+
+        highKeyFtm = FindTupleMode.EXCLUSIVE;
+        if (pred.isHighKeyInclusive()) {
+            highKeyFtp = FindTupleNoExactMatchPolicy.HIGHER_KEY;
+        } else {
+            highKeyFtp = FindTupleNoExactMatchPolicy.LOWER_KEY;
+        }
+    }
+
     @Override
     public void doClose() throws HyracksDataException {
         frameTuple.close();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
index d0e1e1d..6f95dbf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
@@ -42,6 +42,7 @@
     private final int numberOfPrimaryKeys;
     private int totalNumberOfMegaLeafNodes;
     private int numOfSkippedMegaLeafNodes;
+    private int endIndex;
     protected int tupleIndex;
 
     /**
@@ -78,47 +79,72 @@
     }
 
     @Override
-    public final void reset(int startIndex) throws HyracksDataException {
-        tupleIndex = startIndex;
+    public final void newPage() throws HyracksDataException {
+        tupleIndex = 0;
         ByteBuffer pageZero = frame.getBuffer();
         pageZero.clear();
         pageZero.position(HEADER_SIZE);
 
         int numberOfTuples = frame.getTupleCount();
-        //Start new page and check whether we should skip reading non-key columns or not
-        boolean readColumnPages = startNewPage(pageZero, frame.getNumberOfColumns(), numberOfTuples);
 
         //Start primary keys
         for (int i = 0; i < numberOfPrimaryKeys; i++) {
             IColumnBufferProvider provider = primaryKeyBufferProviders[i];
             provider.reset(frame);
-            startPrimaryKey(provider, tupleIndex, i, numberOfTuples);
+            startPrimaryKey(provider, i, numberOfTuples);
         }
+    }
 
+    @Override
+    public final void reset(int startIndex, int endIndex) throws HyracksDataException {
+        tupleIndex = startIndex;
+        this.endIndex = endIndex;
+        ByteBuffer pageZero = frame.getBuffer();
+        int numberOfTuples = frame.getTupleCount();
+        //Start new page and check whether we should skip reading non-key columns or not
+        boolean readColumnPages = startNewPage(pageZero, frame.getNumberOfColumns(), numberOfTuples);
+        setPrimaryKeysAt(startIndex, startIndex);
         if (readColumnPages) {
             for (int i = 0; i < buffersProviders.length; i++) {
                 IColumnBufferProvider provider = buffersProviders[i];
                 //Release previous pinned pages if any
                 provider.releaseAll();
                 provider.reset(frame);
-                startColumn(provider, tupleIndex, i, numberOfTuples);
+                startColumn(provider, i, numberOfTuples);
             }
+            // Skip until before startIndex (i.e. stop at startIndex - 1)
+            skip(startIndex);
         } else {
             numOfSkippedMegaLeafNodes++;
         }
         totalNumberOfMegaLeafNodes++;
     }
 
+    @Override
+    public final void setAt(int startIndex) throws HyracksDataException {
+        int skipCount = startIndex - tupleIndex;
+        tupleIndex = startIndex;
+        setPrimaryKeysAt(startIndex, skipCount);
+        // -1 because next would be called for all columns
+        skip(skipCount - 1);
+    }
+
+    protected abstract void setPrimaryKeysAt(int index, int skipCount) throws HyracksDataException;
+
     protected abstract boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples);
 
-    protected abstract void startPrimaryKey(IColumnBufferProvider bufferProvider, int startIndex, int ordinal,
-            int numberOfTuples) throws HyracksDataException;
+    protected abstract void startPrimaryKey(IColumnBufferProvider bufferProvider, int ordinal, int numberOfTuples)
+            throws HyracksDataException;
 
-    protected abstract void startColumn(IColumnBufferProvider buffersProvider, int startIndex, int ordinal,
-            int numberOfTuples) throws HyracksDataException;
+    protected abstract void startColumn(IColumnBufferProvider buffersProvider, int ordinal, int numberOfTuples)
+            throws HyracksDataException;
 
     protected abstract void onNext() throws HyracksDataException;
 
+    protected final int getTupleCount() {
+        return frame.getTupleCount();
+    }
+
     @Override
     public final void next() throws HyracksDataException {
         onNext();
@@ -132,7 +158,7 @@
 
     @Override
     public final boolean isConsumed() {
-        return tupleIndex >= frame.getTupleCount();
+        return tupleIndex >= endIndex;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
index 8c8a550..de4c4a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
@@ -103,12 +103,6 @@
             cursor.next();
             matchingTupleCount++;
             ITupleReference tuple = cursor.getTuple();
-            if (tupleFilter != null) {
-                referenceFilterTuple.reset(tuple);
-                if (!tupleFilter.accept(referenceFilterTuple)) {
-                    continue;
-                }
-            }
             tb.reset();
 
             if (retainInput && retainMissing) {
@@ -124,7 +118,13 @@
                     tb.addFieldEndOffset();
                 }
             }
-            writeTupleToOutput(tuple);
+            ITupleReference projectedTuple = writeTupleToOutput(tuple);
+            if (tupleFilter != null) {
+                referenceFilterTuple.reset(projectedTuple);
+                if (!tupleFilter.accept(referenceFilterTuple)) {
+                    continue;
+                }
+            }
             FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
             if (outputLimit >= 0 && ++outputCount >= outputLimit) {
                 finished = true;