[ASTERIXDB-3025][HYR] Introduce ITupleProjector

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
The TupleProjector is a way to pushdown value accesses
for the internal datasets (similar to Parquet's).

The current default implementation is basically a no-op.
This patch introduces the mechanism for the columnar
format's tuple projector implementation.

Change-Id: I1fadcef70451a7616e021771a1110413de4fb711
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15465
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index 553fb4c..fed180b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -66,6 +66,7 @@
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeBatchPointSearchCursor;
 
 /**
@@ -160,7 +161,8 @@
                 jobGenParams.isLowKeyInclusive(), jobGenParams.isHighKeyInclusive(), propagateFilter,
                 nonFilterWriterFactory, minFilterFieldIndexes, maxFilterFieldIndexes, tupleFilterFactory, outputLimit,
                 unnestMap.getGenerateCallBackProceedResultVar(),
-                isPrimaryIndexPointSearch(op, context.getPhysicalOptimizationConfig()));
+                isPrimaryIndexPointSearch(op, context.getPhysicalOptimizationConfig()),
+                DefaultTupleProjectorFactory.INSTANCE);
         IOperatorDescriptor opDesc = btreeSearch.first;
         opDesc.setSourceLocation(unnestMap.getSourceLocation());
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index 1d82b19..14d1a6a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -53,6 +53,7 @@
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
 
 public class DatasetDataSource extends DataSource {
 
@@ -135,7 +136,7 @@
                 return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, context, true, false, null,
                         ((DatasetDataSource) dataSource).getDataset(), primaryIndex.getIndexName(), null, null, true,
                         true, false, null, minFilterFieldIndexes, maxFilterFieldIndexes, tupleFilterFactory,
-                        outputLimit, false, false);
+                        outputLimit, false, false, DefaultTupleProjectorFactory.INSTANCE);
             default:
                 throw new AlgebricksException("Unknown datasource type");
         }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 4cc83ab..1cdbbea 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -169,6 +169,7 @@
 import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
 import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
 import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
 
 public class MetadataProvider implements IMetadataProvider<DataSourceId, String> {
 
@@ -542,7 +543,8 @@
             int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
             boolean propagateFilter, IMissingWriterFactory nonFilterWriterFactory, int[] minFilterFieldIndexes,
             int[] maxFilterFieldIndexes, ITupleFilterFactory tupleFilterFactory, long outputLimit,
-            boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch) throws AlgebricksException {
+            boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch, ITupleProjectorFactory tupleProjectorFactory)
+            throws AlgebricksException {
         boolean isSecondary = true;
         Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
                 dataset.getDatasetName(), dataset.getDatasetName());
@@ -601,12 +603,13 @@
                     ? new LSMBTreeBatchPointSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields,
                             highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput,
                             retainMissing, nonMatchWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
-                            maxFilterFieldIndexes, tupleFilterFactory, outputLimit)
+                            maxFilterFieldIndexes, tupleFilterFactory, outputLimit, tupleProjectorFactory)
                     : new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields, highKeyFields,
                             lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing,
                             nonMatchWriterFactory, searchCallbackFactory, minFilterFieldIndexes, maxFilterFieldIndexes,
                             propagateFilter, nonFilterWriterFactory, tupleFilterFactory, outputLimit,
-                            proceedIndexOnlyPlan, failValueForIndexOnlyPlan, successValueForIndexOnlyPlan);
+                            proceedIndexOnlyPlan, failValueForIndexOnlyPlan, successValueForIndexOnlyPlan,
+                            tupleProjectorFactory);
         } else {
             btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields,
                     highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing,
@@ -859,7 +862,7 @@
      *
      * @param dataset
      * @return Number of elements that will be used to create a bloom filter per
-     *         dataset per partition
+     * dataset per partition
      * @throws AlgebricksException
      */
     public long getCardinalityPerPartitionHint(Dataset dataset) throws AlgebricksException {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
index c34a671..3c829e6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
@@ -28,6 +28,8 @@
 
     public static final String INDEX_CURSOR_STATS = "INDEX_CURSOR_STATS";
 
+    public static final String TUPLE_PROJECTOR = "TUPLE_PROJECTOR";
+
     private HyracksConstants() {
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index 2455625..d5a4e5b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -29,6 +29,8 @@
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
+import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
 
 public class BTreeSearchOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
@@ -52,6 +54,7 @@
     protected byte[] searchCallbackProceedResultTrueValue;
     protected final ITupleFilterFactory tupleFilterFactory;
     protected final long outputLimit;
+    protected final ITupleProjectorFactory tupleProjectorFactory;
 
     public BTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
@@ -61,7 +64,8 @@
             IMissingWriterFactory nonFilterWriterFactory) {
         this(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
-                maxFilterFieldIndexes, appendIndexFilter, nonFilterWriterFactory, null, -1, false, null, null);
+                maxFilterFieldIndexes, appendIndexFilter, nonFilterWriterFactory, null, -1, false, null, null,
+                DefaultTupleProjectorFactory.INSTANCE);
     }
 
     public BTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
@@ -71,7 +75,7 @@
             int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean appendIndexFilter,
             IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
             boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
-            byte[] searchCallbackProceedResultTrueValue) {
+            byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory tupleProjectorFactory) {
         super(spec, 1, 1);
         this.indexHelperFactory = indexHelperFactory;
         this.retainInput = retainInput;
@@ -92,6 +96,7 @@
         this.appendOpCallbackProceedResult = appendOpCallbackProceedResult;
         this.searchCallbackProceedResultFalseValue = searchCallbackProceedResultFalseValue;
         this.searchCallbackProceedResultTrueValue = searchCallbackProceedResultTrueValue;
+        this.tupleProjectorFactory = tupleProjectorFactory;
     }
 
     @Override
@@ -102,7 +107,7 @@
                 lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
                 nonFilterWriterFactory, tupleFilterFactory, outputLimit, appendOpCallbackProceedResult,
-                searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue);
+                searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, tupleProjectorFactory);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index fb331bf..a56e305 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
@@ -30,9 +31,11 @@
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
 
 public class BTreeSearchOperatorNodePushable extends IndexSearchOperatorNodePushable {
     protected final boolean lowKeyInclusive;
@@ -52,7 +55,7 @@
         this(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
                 minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing,
                 nonMatchWriterFactory, searchCallbackFactory, appendIndexFilter, nonFilterWriterFactory, null, -1,
-                false, null, null);
+                false, null, null, DefaultTupleProjectorFactory.INSTANCE);
     }
 
     public BTreeSearchOperatorNodePushable(IHyracksTaskContext ctx, int partition, RecordDescriptor inputRecDesc,
@@ -62,11 +65,12 @@
             ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter,
             IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
             boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
-            byte[] searchCallbackProceedResultTrueValue) throws HyracksDataException {
+            byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory projectorFactory)
+            throws HyracksDataException {
         super(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, nonMatchWriterFactory, searchCallbackFactory, appendIndexFilter,
                 nonFilterWriterFactory, tupleFilterFactory, outputLimit, appendOpCallbackProceedResult,
-                searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue);
+                searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, projectorFactory);
         this.lowKeyInclusive = lowKeyInclusive;
         this.highKeyInclusive = highKeyInclusive;
         if (lowKeyFields != null && lowKeyFields.length > 0) {
@@ -111,7 +115,8 @@
 
     @Override
     protected void addAdditionalIndexAccessorParams(IIndexAccessParameters iap) throws HyracksDataException {
-        // No additional parameters are required for the B+Tree search case
+        //Set tuple projector to get the information about the pushed down value accesses (if supported by the index)
+        iap.getParameters().put(HyracksConstants.TUPLE_PROJECTOR, tupleProjector);
     }
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index eab8c96..1fe91d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -44,6 +44,7 @@
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
 import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.tuples.ReferenceFrameTupleReference;
@@ -54,6 +55,8 @@
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.projection.ITupleProjector;
+import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
 import org.apache.hyracks.util.IThreadStatsCollector;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
@@ -107,6 +110,7 @@
     protected final long outputLimit;
     protected long outputCount = 0;
     protected boolean finished;
+    protected final ITupleProjector tupleProjector;
 
     // no filter and limit pushdown
     public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
@@ -116,7 +120,7 @@
             IMissingWriterFactory nonFilterWriterFactory) throws HyracksDataException {
         this(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, nonMatchWriterFactory, searchCallbackFactory, appendIndexFilter,
-                nonFilterWriterFactory, null, -1, false, null, null);
+                nonFilterWriterFactory, null, -1, false, null, null, DefaultTupleProjectorFactory.INSTANCE);
     }
 
     public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
@@ -125,7 +129,8 @@
             ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter,
             IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFactoryFactory, long outputLimit,
             boolean appendSearchCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
-            byte[] searchCallbackProceedResultTrueValue) throws HyracksDataException {
+            byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory projectorFactory)
+            throws HyracksDataException {
         this.ctx = ctx;
         this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
         this.retainInput = retainInput;
@@ -162,6 +167,8 @@
         if (this.tupleFilterFactory != null && this.retainMissing) {
             throw new IllegalStateException("RetainMissing with tuple filter is not supported");
         }
+
+        tupleProjector = projectorFactory.createTupleProjector(ctx);
     }
 
     protected abstract ISearchPredicate createSearchPredicate();
@@ -349,10 +356,7 @@
 
     protected void writeTupleToOutput(ITupleReference tuple) throws IOException {
         try {
-            for (int i = 0; i < tuple.getFieldCount(); i++) {
-                dos.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
-                tb.addFieldEndOffset();
-            }
+            tupleProjector.project(tuple, dos, tb);
         } catch (Exception e) {
             throw e;
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java
new file mode 100644
index 0000000..00cb0c5
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.common.impls;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.common.projection.ITupleProjector;
+
+class DefaultTupleProjector implements ITupleProjector {
+    public static final ITupleProjector INSTANCE = new DefaultTupleProjector();
+
+    private DefaultTupleProjector() {
+    }
+
+    @Override
+    public void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException {
+        for (int i = 0; i < tuple.getFieldCount(); i++) {
+            dos.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
+            tb.addFieldEndOffset();
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjectorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjectorFactory.java
new file mode 100644
index 0000000..092982d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjectorFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.common.impls;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.projection.ITupleProjector;
+import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
+
+public class DefaultTupleProjectorFactory implements ITupleProjectorFactory {
+    private static final long serialVersionUID = -4525893018744087821L;
+    public static final DefaultTupleProjectorFactory INSTANCE = new DefaultTupleProjectorFactory();
+
+    private DefaultTupleProjectorFactory() {
+    }
+
+    @Override
+    public ITupleProjector createTupleProjector(IHyracksTaskContext context) throws HyracksDataException {
+        return DefaultTupleProjector.INSTANCE;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
index 3800d17..3e03e5c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
 
 public class LSMBTreeBatchPointSearchOperatorDescriptor extends BTreeSearchOperatorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -38,10 +39,11 @@
             IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing,
             IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
             int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, ITupleFilterFactory tupleFilterFactory,
-            long outputLimit) {
+            long outputLimit, ITupleProjectorFactory tupleProjectorFactory) {
         super(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
-                maxFilterFieldIndexes, false, null, tupleFilterFactory, outputLimit, false, null, null);
+                maxFilterFieldIndexes, false, null, tupleFilterFactory, outputLimit, false, null, null,
+                tupleProjectorFactory);
     }
 
     @Override
@@ -51,7 +53,7 @@
                 recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), lowKeyFields, highKeyFields,
                 lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, tupleFilterFactory,
-                outputLimit);
+                outputLimit, tupleProjectorFactory);
     }
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
index 4e024c0..9b8c353 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
@@ -39,6 +39,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
 
 public class LSMBTreeBatchPointSearchOperatorNodePushable extends BTreeSearchOperatorNodePushable {
 
@@ -49,11 +50,12 @@
             boolean highKeyInclusive, int[] minFilterKeyFields, int[] maxFilterKeyFields,
             IIndexDataflowHelperFactory indexHelperFactory, boolean retainInput, boolean retainMissing,
             IMissingWriterFactory missingWriterFactory, ISearchOperationCallbackFactory searchCallbackFactory,
-            ITupleFilterFactory tupleFilterFactory, long outputLimit) throws HyracksDataException {
+            ITupleFilterFactory tupleFilterFactory, long outputLimit, ITupleProjectorFactory tupleProjectorFactory)
+            throws HyracksDataException {
         super(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
                 minFilterKeyFields, maxFilterKeyFields, indexHelperFactory, retainInput, retainMissing,
                 missingWriterFactory, searchCallbackFactory, false, null, tupleFilterFactory, outputLimit, false, null,
-                null);
+                null, tupleProjectorFactory);
         this.keyFields = lowKeyFields;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
index a8a4252..e1c6f5b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
 import org.apache.hyracks.storage.am.rtree.impls.SearchPredicate;
 import org.apache.hyracks.storage.am.rtree.util.RTreeUtils;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -59,7 +60,7 @@
         super(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
                 nonFilterWriterFactory, null, -1, appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
-                searchCallbackProceedResultTrueValue);
+                searchCallbackProceedResultTrueValue, DefaultTupleProjectorFactory.INSTANCE);
         if (keyFields != null && keyFields.length > 0) {
             searchKey = new PermutingFrameTupleReference();
             searchKey.setFieldPermutation(keyFields);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
new file mode 100644
index 0000000..8ca1a82
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.projection;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public interface ITupleProjector {
+    void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjectorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjectorFactory.java
new file mode 100644
index 0000000..ff9ecf9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjectorFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.projection;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Tuple projector allows the data source to project the values needed before it pushed up
+ * to the upper operator.
+ */
+public interface ITupleProjectorFactory extends Serializable {
+    ITupleProjector createTupleProjector(IHyracksTaskContext context) throws HyracksDataException;
+}