diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index eb4bfd4..4bb1adf 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -443,7 +443,7 @@
         return plan;
     }
 
-    private ILogicalOperator translateDelete(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
+    protected ILogicalOperator translateDelete(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
             List<Mutable<ILogicalExpression>> varRefsForLoading,
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
             ICompiledDmlStatement stmt) throws AlgebricksException {
@@ -464,7 +464,7 @@
         return leafOperator;
     }
 
-    private ILogicalOperator translateUpsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
+    protected ILogicalOperator translateUpsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
             List<Mutable<ILogicalExpression>> varRefsForLoading,
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
             List<String> additionalFilteringField, LogicalVariable unnestVar, ILogicalOperator topOp,
@@ -579,7 +579,7 @@
         return processReturningExpression(rootOperator, upsertOp, compiledUpsert, resultMetadata);
     }
 
-    private ILogicalOperator translateInsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
+    protected ILogicalOperator translateInsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
             List<Mutable<ILogicalExpression>> varRefsForLoading,
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
             ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws AlgebricksException {
@@ -609,7 +609,7 @@
 
     // Stitches the translated operators for the returning expression into the query
     // plan.
-    private ILogicalOperator processReturningExpression(ILogicalOperator inputOperator,
+    protected ILogicalOperator processReturningExpression(ILogicalOperator inputOperator,
             InsertDeleteUpsertOperator insertOp, CompiledInsertStatement compiledInsert, IResultMetadata resultMetadata)
             throws AlgebricksException {
         Expression returnExpression = compiledInsert.getReturnExpression();
@@ -651,7 +651,7 @@
         return distResultOperator;
     }
 
-    private DatasetDataSource validateDatasetInfo(MetadataProvider metadataProvider, DataverseName dataverseName,
+    protected DatasetDataSource validateDatasetInfo(MetadataProvider metadataProvider, DataverseName dataverseName,
             String datasetName, SourceLocation sourceLoc) throws AlgebricksException {
         Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
         if (dataset == null) {
@@ -671,7 +671,7 @@
                 dataset.getDatasetDetails(), domain);
     }
 
-    private FileSplit getDefaultOutputFileLocation(ICcApplicationContext appCtx) throws AlgebricksException {
+    protected FileSplit getDefaultOutputFileLocation(ICcApplicationContext appCtx) throws AlgebricksException {
         String outputDir = System.getProperty("java.io.tmpDir");
         String filePath =
                 outputDir + System.getProperty("file.separator") + OUTPUT_FILE_PREFIX + outputFileID.incrementAndGet();
@@ -1760,7 +1760,7 @@
      *            the query plan.
      * @throws CompilationException
      */
-    private void eliminateSharedOperatorReferenceForPlan(ILogicalPlan plan) throws CompilationException {
+    protected void eliminateSharedOperatorReferenceForPlan(ILogicalPlan plan) throws CompilationException {
         for (Mutable<ILogicalOperator> opRef : plan.getRoots()) {
             Set<Mutable<ILogicalOperator>> opRefSet = new HashSet<>();
             eliminateSharedOperatorReference(opRef, opRefSet);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index d4a52fb..c4db4eb 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -26,6 +26,7 @@
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IRetryPolicyFactory;
+import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -50,14 +51,17 @@
 
     private final Feed feed;
     private final List<FeedConnection> feedConnections;
+    private final ILangExtension.Language translatorLang;
 
     public FeedEventsListener(IStatementExecutor statementExecutor, ICcApplicationContext appCtx,
             IHyracksClientConnection hcc, EntityId entityId, List<Dataset> datasets,
             AlgebricksAbsolutePartitionConstraint locations, String runtimeName, IRetryPolicyFactory retryPolicyFactory,
-            Feed feed, final List<FeedConnection> feedConnections) throws HyracksDataException {
+            Feed feed, final List<FeedConnection> feedConnections, ILangExtension.Language translatorLang)
+            throws HyracksDataException {
         super(statementExecutor, appCtx, hcc, entityId, datasets, locations, runtimeName, retryPolicyFactory);
         this.feed = feed;
         this.feedConnections = feedConnections;
+        this.translatorLang = translatorLang;
     }
 
     @Override
@@ -94,8 +98,8 @@
     @Override
     protected JobId compileAndStartJob(MetadataProvider mdProvider) throws HyracksDataException {
         try {
-            Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo =
-                    FeedOperations.buildStartFeedJob(mdProvider, feed, feedConnections, statementExecutor, hcc);
+            Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo = FeedOperations
+                    .buildStartFeedJob(mdProvider, feed, feedConnections, statementExecutor, hcc, translatorLang);
             JobSpecification feedJob = jobInfo.getLeft();
             feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
             // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs.
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 78a82d6..f8e0cee 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -1986,7 +1986,7 @@
         declaredFunctions.add(fds);
     }
 
-    protected void handleCreateFunctionStatement(MetadataProvider metadataProvider, Statement stmt,
+    public void handleCreateFunctionStatement(MetadataProvider metadataProvider, Statement stmt,
             IStatementRewriter stmtRewriter) throws Exception {
         CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
         FunctionSignature signature = cfs.getFunctionSignature();
@@ -2985,7 +2985,7 @@
         }
     }
 
-    private void handleStartFeedStatement(MetadataProvider metadataProvider, Statement stmt,
+    protected void handleStartFeedStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         StartFeedStatement sfs = (StartFeedStatement) stmt;
         SourceLocation sourceLoc = sfs.getSourceLocation();
@@ -3024,7 +3024,7 @@
                 }
                 listener = new FeedEventsListener(this, metadataProvider.getApplicationContext(), hcc, entityId,
                         datasets, null, FeedIntakeOperatorNodePushable.class.getSimpleName(),
-                        NoRetryPolicyFactory.INSTANCE, feed, feedConnections);
+                        NoRetryPolicyFactory.INSTANCE, feed, feedConnections, compilationProvider.getLanguage());
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             committed = true;
@@ -3039,7 +3039,7 @@
         }
     }
 
-    private void handleStopFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
+    protected void handleStopFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
         StopFeedStatement sfst = (StopFeedStatement) stmt;
         SourceLocation sourceLoc = sfst.getSourceLocation();
         DataverseName dataverseName = getActiveDataverseName(sfst.getDataverseName());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 20a97ed..68234b7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -29,6 +29,7 @@
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.app.result.ResponsePrinter;
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.app.translator.QueryTranslator;
@@ -450,7 +451,8 @@
 
     public static Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> buildStartFeedJob(
             MetadataProvider metadataProvider, Feed feed, List<FeedConnection> feedConnections,
-            IStatementExecutor statementExecutor, IHyracksClientConnection hcc) throws Exception {
+            IStatementExecutor statementExecutor, IHyracksClientConnection hcc, ILangExtension.Language translatorLang)
+            throws Exception {
         FeedPolicyAccessor fpa = new FeedPolicyAccessor(new HashMap<>());
         Pair<JobSpecification, ITypedAdapterFactory> intakeInfo = buildFeedIntakeJobSpec(feed, metadataProvider, fpa);
         List<JobSpecification> jobsList = new ArrayList<>();
@@ -465,8 +467,9 @@
         metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS,
                 StringUtils.join(ingestionLocations, ','));
         // TODO: Once we deprecated AQL, this extra queryTranslator can be removed.
-        IStatementExecutor translator =
-                getSQLPPTranslator(metadataProvider, ((QueryTranslator) statementExecutor).getSessionOutput());
+        IStatementExecutor translator = translatorLang == ILangExtension.Language.AQL
+                ? getSQLPPTranslator(metadataProvider, ((QueryTranslator) statementExecutor).getSessionOutput())
+                : statementExecutor;
         // Add connection job
         for (FeedConnection feedConnection : feedConnections) {
             JobSpecification connectionJob =
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index 963ba7c..b23b3cf 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -107,7 +107,7 @@
         // Add event listener
         ActiveEntityEventsListener eventsListener = new DummyFeedEventsListener(statementExecutor, appCtx, null,
                 entityId, datasetList, partitionConstraint, FeedIntakeOperatorNodePushable.class.getSimpleName(),
-                NoRetryPolicyFactory.INSTANCE, null, Collections.emptyList());
+                NoRetryPolicyFactory.INSTANCE, null, Collections.emptyList(), Language.SQLPP);
         // Register mock runtime
         NCAppRuntimeContext nc1AppCtx =
                 (NCAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0].getApplicationContext();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
index 88f1332..884d209 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
@@ -26,6 +26,7 @@
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IRetryPolicyFactory;
+import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.app.active.FeedEventsListener;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
@@ -43,9 +44,10 @@
     public DummyFeedEventsListener(IStatementExecutor statementExecutor, ICcApplicationContext appCtx,
             IHyracksClientConnection hcc, EntityId entityId, List<Dataset> datasets,
             AlgebricksAbsolutePartitionConstraint locations, String runtimeName, IRetryPolicyFactory retryPolicyFactory,
-            Feed feed, List<FeedConnection> feedConnections) throws HyracksDataException {
+            Feed feed, List<FeedConnection> feedConnections, ILangExtension.Language translatorLang)
+            throws HyracksDataException {
         super(statementExecutor, appCtx, hcc, entityId, datasets, locations, runtimeName, retryPolicyFactory, feed,
-                feedConnections);
+                feedConnections, translatorLang);
     }
 
     @Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index 5e4b5a1..2ad2f7a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -48,11 +48,11 @@
 public class LSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
     public static final String KEY_INDEX = "Index";
-    private final boolean isPrimary;
-    private final SourceLocation sourceLoc;
+    protected final boolean isPrimary;
+    protected final SourceLocation sourceLoc;
     // This class has both lsmIndex and index (in super class) pointing to the same object
     private AbstractLSMIndex lsmIndex;
-    private int i = 0;
+    protected int i = 0;
 
     /**
      * The following three variables are used to keep track of the information regarding flushing partial frame such as
@@ -64,9 +64,9 @@
      * ==> captured in currentTupleIdx variable
      * These variables are reset for each frame, i.e., whenever nextFrame() is called, these variables are reset.
      */
-    private boolean flushedPartialTuples;
-    private int currentTupleIdx;
-    private int lastFlushedTupleIdx;
+    protected boolean flushedPartialTuples;
+    protected int currentTupleIdx;
+    protected int lastFlushedTupleIdx;
 
     public LSMInsertDeleteOperatorNodePushable(IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
             RecordDescriptor inputRecDesc, IndexOperation op, boolean isPrimary,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java
index 45661e4..2226ca0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java
@@ -33,7 +33,7 @@
 public class LSMTreeInsertDeleteOperatorDescriptor extends LSMTreeIndexInsertUpdateDeleteOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
-    private final boolean isPrimary;
+    protected final boolean isPrimary;
 
     public LSMTreeInsertDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory indexHelperFactory,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 2933756..8a16014 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -131,12 +131,14 @@
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.ResultSetId;
@@ -975,10 +977,18 @@
                 fieldPermutation[i++] = idx;
             }
         }
-        return DatasetUtil.createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc, fieldPermutation,
+        return createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc, fieldPermutation,
                 context.getMissingWriterFactory());
     }
 
+    protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> createPrimaryIndexUpsertOp(JobSpecification spec,
+            MetadataProvider metadataProvider, Dataset dataset, RecordDescriptor inputRecordDesc,
+            int[] fieldPermutation, IMissingWriterFactory missingWriterFactory) throws AlgebricksException {
+        // this can be used by extensions to pick up their own operators
+        return DatasetUtil.createPrimaryIndexUpsertOp(spec, this, dataset, inputRecordDesc, fieldPermutation,
+                missingWriterFactory);
+    }
+
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
             JobSpecification jobSpec, IAType itemType, ITypedAdapterFactory adapterFactory) throws AlgebricksException {
         if (itemType.getTypeTag() != ATypeTag.OBJECT) {
@@ -1120,17 +1130,34 @@
                     pkidfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
                             primaryKeySplitsAndConstraint.first);
                 }
-                op = new LSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
+                op = createLSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
                         modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields);
 
             } else {
-                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
+                op = createLSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
                         null, true, modificationCallbackFactory);
             }
         }
         return new Pair<>(op, splitsAndConstraint.second);
     }
 
+    protected LSMPrimaryInsertOperatorDescriptor createLSMPrimaryInsertOperatorDescriptor(JobSpecification spec,
+            RecordDescriptor inputRecordDesc, int[] fieldPermutation, IIndexDataflowHelperFactory idfh,
+            IIndexDataflowHelperFactory pkidfh, IModificationOperationCallbackFactory modificationCallbackFactory,
+            ISearchOperationCallbackFactory searchCallbackFactory, int numKeys, int[] filterFields) {
+        // this can be used by extensions to pick up their own operators
+        return new LSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
+                modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields);
+    }
+
+    protected LSMTreeInsertDeleteOperatorDescriptor createLSMTreeInsertDeleteOperatorDescriptor(
+            IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int[] fieldPermutation, IndexOperation op,
+            IIndexDataflowHelperFactory indexHelperFactory, ITupleFilterFactory tupleFilterFactory, boolean isPrimary,
+            IModificationOperationCallbackFactory modCallbackFactory) {
+        return new LSMTreeInsertDeleteOperatorDescriptor(spec, outRecDesc, fieldPermutation, op, indexHelperFactory,
+                tupleFilterFactory, isPrimary, modCallbackFactory);
+    }
+
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteOrUpsertRuntime(
             IndexOperation indexOp, IDataSourceIndex<String, DataSourceId> dataSourceIndex,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
index 42b8f29..d639f3d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
@@ -34,10 +34,10 @@
 
     private static final long serialVersionUID = 1L;
 
-    private final IIndexDataflowHelperFactory keyIndexHelperFactory;
-    private final ISearchOperationCallbackFactory searchOpCallbackFactory;
-    private final int numOfPrimaryKeys;
-    private final int[] filterFields;
+    protected final IIndexDataflowHelperFactory keyIndexHelperFactory;
+    protected final ISearchOperationCallbackFactory searchOpCallbackFactory;
+    protected final int numOfPrimaryKeys;
+    protected final int[] filterFields;
 
     public LSMPrimaryInsertOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory,
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 0bb42d4..a886161 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -105,7 +105,17 @@
             }
         }
         keyTuple = new PermutingFrameTupleReference(searchKeyPermutations);
-        processor = new IFrameTupleProcessor() {
+        processor = createTupleProcessor(sourceLoc);
+        frameOpCallback = NoOpFrameOperationCallbackFactory.INSTANCE.createFrameOperationCallback(ctx, lsmAccessor);
+    }
+
+    protected void beforeModification(ITupleReference tuple) {
+        // this is used for extensions to modify tuples before persistence
+        // do nothing in the master branch
+    }
+
+    protected IFrameTupleProcessor createTupleProcessor(SourceLocation sourceLoc) {
+        return new IFrameTupleProcessor() {
             @Override
             public void process(ITupleReference tuple, int index) throws HyracksDataException {
                 if (index < currentTupleIdx) {
@@ -127,6 +137,7 @@
                     cursor.close();
                 }
                 if (!duplicate) {
+                    beforeModification(tuple);
                     lsmAccessor.forceUpsert(tuple);
                     if (lsmAccessorForKeyIndex != null) {
                         lsmAccessorForKeyIndex.forceUpsert(keyTuple);
@@ -157,8 +168,6 @@
                 // no op
             }
         };
-
-        frameOpCallback = NoOpFrameOperationCallbackFactory.INSTANCE.createFrameOperationCallback(ctx, lsmAccessor);
     }
 
     @Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
index 7587ca69..43f54f2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
@@ -36,13 +36,13 @@
 public class LSMPrimaryUpsertOperatorDescriptor extends LSMTreeInsertDeleteOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
-    private final IFrameOperationCallbackFactory frameOpCallbackFactory;
-    private final ARecordType recordType;
-    private final int filterIndex;
-    private ISearchOperationCallbackFactory searchOpCallbackFactory;
-    private final int numPrimaryKeys;
-    private final IMissingWriterFactory missingWriterFactory;
-    private final boolean hasSecondaries;
+    protected final IFrameOperationCallbackFactory frameOpCallbackFactory;
+    protected final ARecordType recordType;
+    protected final int filterIndex;
+    protected ISearchOperationCallbackFactory searchOpCallbackFactory;
+    protected final int numPrimaryKeys;
+    protected final IMissingWriterFactory missingWriterFactory;
+    protected final boolean hasSecondaries;
 
     public LSMPrimaryUpsertOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory,
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 3e43e02..316665d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -83,17 +83,17 @@
     private static final Logger LOGGER = LogManager.getLogger();
     private static final ThreadLocal<DateFormat> DATE_FORMAT =
             ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
-    private final PermutingFrameTupleReference key;
+    protected final PermutingFrameTupleReference key;
     private MultiComparator keySearchCmp;
     private ArrayTupleBuilder missingTupleBuilder;
     private final IMissingWriter missingWriter;
-    private ArrayTupleBuilder tb;
+    protected ArrayTupleBuilder tb;
     private DataOutput dos;
-    private RangePredicate searchPred;
-    private IIndexCursor cursor;
-    private ITupleReference prevTuple;
-    private final int numOfPrimaryKeys;
-    boolean isFiltered = false;
+    protected RangePredicate searchPred;
+    protected IIndexCursor cursor;
+    protected ITupleReference prevTuple;
+    protected final int numOfPrimaryKeys;
+    protected boolean isFiltered = false;
     private final ArrayTupleReference prevTupleWithFilter = new ArrayTupleReference();
     private ArrayTupleBuilder prevRecWithPKWithFilterValue;
     private ARecordType recordType;
@@ -103,13 +103,13 @@
     private final boolean hasMeta;
     private final int filterFieldIndex;
     private final int metaFieldIndex;
-    private LockThenSearchOperationCallback searchCallback;
-    private IFrameOperationCallback frameOpCallback;
+    protected LockThenSearchOperationCallback searchCallback;
+    protected IFrameOperationCallback frameOpCallback;
     private final IFrameOperationCallbackFactory frameOpCallbackFactory;
-    private AbstractIndexModificationOperationCallback abstractModCallback;
+    protected AbstractIndexModificationOperationCallback abstractModCallback;
     private final ISearchOperationCallbackFactory searchCallbackFactory;
     private final IFrameTupleProcessor processor;
-    private LSMTreeIndexAccessor lsmAccessor;
+    protected LSMTreeIndexAccessor lsmAccessor;
     private final ITracer tracer;
     private final long traceCategory;
     private long lastRecordInTimeStamp = 0L;
@@ -144,7 +144,18 @@
             this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0));
             this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
         }
-        processor = new IFrameTupleProcessor() {
+        processor = createTupleProcessor(hasSecondaries);
+        tracer = ctx.getJobletContext().getServiceContext().getTracer();
+        traceCategory = tracer.getRegistry().get(TraceUtils.LATENCY);
+    }
+
+    protected void beforeModification(ITupleReference tuple) {
+        // this is used for extensions to modify tuples before persistence
+        // do nothing in the master branch
+    }
+
+    protected IFrameTupleProcessor createTupleProcessor(final boolean hasSecondaries) {
+        return new IFrameTupleProcessor() {
             @Override
             public void process(ITupleReference tuple, int index) throws HyracksDataException {
                 try {
@@ -176,6 +187,7 @@
                         appendUpsertIndicator(!isDelete);
                         appendPreviousTupleAsMissing();
                     }
+                    beforeModification(tuple);
                     if (isDelete && prevTuple != null) {
                         // Only delete if it is a delete and not upsert
                         // And previous tuple with the same key was found
@@ -213,8 +225,6 @@
                 frameOpCallback.fail(th);
             }
         };
-        tracer = ctx.getJobletContext().getServiceContext().getTracer();
-        traceCategory = tracer.getRegistry().get(TraceUtils.LATENCY);
     }
 
     // we have the permutation which has [pk locations, record location, optional:filter-location]
@@ -285,12 +295,12 @@
         }
     }
 
-    private void resetSearchPredicate(int tupleIndex) {
+    protected void resetSearchPredicate(int tupleIndex) {
         key.reset(accessor, tupleIndex);
         searchPred.reset(key, key, true, true, keySearchCmp, keySearchCmp);
     }
 
-    private void writeOutput(int tupleIndex, boolean recordWasInserted, boolean recordWasDeleted) throws IOException {
+    protected void writeOutput(int tupleIndex, boolean recordWasInserted, boolean recordWasDeleted) throws IOException {
         if (recordWasInserted || recordWasDeleted) {
             frameTuple.reset(accessor, tupleIndex);
             for (int i = 0; i < frameTuple.getFieldCount(); i++) {
@@ -307,7 +317,7 @@
         }
     }
 
-    private static boolean isDeleteOperation(ITupleReference t1, int field) {
+    protected static boolean isDeleteOperation(ITupleReference t1, int field) {
         return TypeTagUtil.isType(t1, field, ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
     }
 
@@ -326,7 +336,7 @@
         }
     }
 
-    private void appendFilterToOutput() throws IOException {
+    protected void appendFilterToOutput() throws IOException {
         // if with filters, append the filter
         if (isFiltered) {
             dos.write(prevTuple.getFieldData(filterFieldIndex), prevTuple.getFieldStart(filterFieldIndex),
@@ -335,18 +345,18 @@
         }
     }
 
-    private void appendUpsertIndicator(boolean isUpsert) throws IOException {
+    protected void appendUpsertIndicator(boolean isUpsert) throws IOException {
         recordDesc.getFields()[0].serialize(isUpsert ? ABoolean.TRUE : ABoolean.FALSE, dos);
         tb.addFieldEndOffset();
     }
 
-    private void appendPrevRecord() throws IOException {
+    protected void appendPrevRecord() throws IOException {
         dos.write(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys),
                 prevTuple.getFieldLength(numOfPrimaryKeys));
         tb.addFieldEndOffset();
     }
 
-    private void appendPreviousMeta() throws IOException {
+    protected void appendPreviousMeta() throws IOException {
         // if has meta, then append meta
         if (hasMeta) {
             dos.write(prevTuple.getFieldData(metaFieldIndex), prevTuple.getFieldStart(metaFieldIndex),
@@ -355,7 +365,7 @@
         }
     }
 
-    private void appendPreviousTupleAsMissing() throws IOException {
+    protected void appendPreviousTupleAsMissing() throws IOException {
         prevTuple = null;
         writeMissingField();
         if (hasMeta) {
@@ -376,7 +386,7 @@
         appender.write(writer, true);
     }
 
-    private void appendFilterToPrevTuple() throws IOException {
+    protected void appendFilterToPrevTuple() throws IOException {
         if (isFiltered) {
             prevRecWithPKWithFilterValue.reset();
             for (int i = 0; i < prevTuple.getFieldCount(); i++) {
