[NO ISSUE] Adding more extension APIs

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
1. Allow extensions to override LangExpressionToPlanTranslator to
tranlsate insert/upsert/delete.
2. Reuse the current statement executor in feed if possible to make sure
extensions' translator can be used properly.
3. Add new methods in MetadataProvider so that extensions could override
them to use customized LSM runtime operators.
4. Add new operational interface in LSM runtime to allow extensions to
modify data before persistence.

Change-Id: Iada557f15af9de6fbdb6f6d4e41c0266c1d3fbff
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7643
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Xikui Wang <xkkwww@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index eb4bfd4..4bb1adf 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -443,7 +443,7 @@
         return plan;
     }
 
-    private ILogicalOperator translateDelete(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
+    protected ILogicalOperator translateDelete(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
             List<Mutable<ILogicalExpression>> varRefsForLoading,
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
             ICompiledDmlStatement stmt) throws AlgebricksException {
@@ -464,7 +464,7 @@
         return leafOperator;
     }
 
-    private ILogicalOperator translateUpsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
+    protected ILogicalOperator translateUpsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
             List<Mutable<ILogicalExpression>> varRefsForLoading,
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
             List<String> additionalFilteringField, LogicalVariable unnestVar, ILogicalOperator topOp,
@@ -579,7 +579,7 @@
         return processReturningExpression(rootOperator, upsertOp, compiledUpsert, resultMetadata);
     }
 
-    private ILogicalOperator translateInsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
+    protected ILogicalOperator translateInsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
             List<Mutable<ILogicalExpression>> varRefsForLoading,
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
             ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws AlgebricksException {
@@ -609,7 +609,7 @@
 
     // Stitches the translated operators for the returning expression into the query
     // plan.
-    private ILogicalOperator processReturningExpression(ILogicalOperator inputOperator,
+    protected ILogicalOperator processReturningExpression(ILogicalOperator inputOperator,
             InsertDeleteUpsertOperator insertOp, CompiledInsertStatement compiledInsert, IResultMetadata resultMetadata)
             throws AlgebricksException {
         Expression returnExpression = compiledInsert.getReturnExpression();
@@ -651,7 +651,7 @@
         return distResultOperator;
     }
 
-    private DatasetDataSource validateDatasetInfo(MetadataProvider metadataProvider, DataverseName dataverseName,
+    protected DatasetDataSource validateDatasetInfo(MetadataProvider metadataProvider, DataverseName dataverseName,
             String datasetName, SourceLocation sourceLoc) throws AlgebricksException {
         Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
         if (dataset == null) {
@@ -671,7 +671,7 @@
                 dataset.getDatasetDetails(), domain);
     }
 
-    private FileSplit getDefaultOutputFileLocation(ICcApplicationContext appCtx) throws AlgebricksException {
+    protected FileSplit getDefaultOutputFileLocation(ICcApplicationContext appCtx) throws AlgebricksException {
         String outputDir = System.getProperty("java.io.tmpDir");
         String filePath =
                 outputDir + System.getProperty("file.separator") + OUTPUT_FILE_PREFIX + outputFileID.incrementAndGet();
@@ -1760,7 +1760,7 @@
      *            the query plan.
      * @throws CompilationException
      */
-    private void eliminateSharedOperatorReferenceForPlan(ILogicalPlan plan) throws CompilationException {
+    protected void eliminateSharedOperatorReferenceForPlan(ILogicalPlan plan) throws CompilationException {
         for (Mutable<ILogicalOperator> opRef : plan.getRoots()) {
             Set<Mutable<ILogicalOperator>> opRefSet = new HashSet<>();
             eliminateSharedOperatorReference(opRef, opRefSet);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index d4a52fb..c4db4eb 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -26,6 +26,7 @@
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IRetryPolicyFactory;
+import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -50,14 +51,17 @@
 
     private final Feed feed;
     private final List<FeedConnection> feedConnections;
+    private final ILangExtension.Language translatorLang;
 
     public FeedEventsListener(IStatementExecutor statementExecutor, ICcApplicationContext appCtx,
             IHyracksClientConnection hcc, EntityId entityId, List<Dataset> datasets,
             AlgebricksAbsolutePartitionConstraint locations, String runtimeName, IRetryPolicyFactory retryPolicyFactory,
-            Feed feed, final List<FeedConnection> feedConnections) throws HyracksDataException {
+            Feed feed, final List<FeedConnection> feedConnections, ILangExtension.Language translatorLang)
+            throws HyracksDataException {
         super(statementExecutor, appCtx, hcc, entityId, datasets, locations, runtimeName, retryPolicyFactory);
         this.feed = feed;
         this.feedConnections = feedConnections;
+        this.translatorLang = translatorLang;
     }
 
     @Override
@@ -94,8 +98,8 @@
     @Override
     protected JobId compileAndStartJob(MetadataProvider mdProvider) throws HyracksDataException {
         try {
-            Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo =
-                    FeedOperations.buildStartFeedJob(mdProvider, feed, feedConnections, statementExecutor, hcc);
+            Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo = FeedOperations
+                    .buildStartFeedJob(mdProvider, feed, feedConnections, statementExecutor, hcc, translatorLang);
             JobSpecification feedJob = jobInfo.getLeft();
             feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
             // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs.
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 78a82d6..f8e0cee 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -1986,7 +1986,7 @@
         declaredFunctions.add(fds);
     }
 
-    protected void handleCreateFunctionStatement(MetadataProvider metadataProvider, Statement stmt,
+    public void handleCreateFunctionStatement(MetadataProvider metadataProvider, Statement stmt,
             IStatementRewriter stmtRewriter) throws Exception {
         CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
         FunctionSignature signature = cfs.getFunctionSignature();
@@ -2985,7 +2985,7 @@
         }
     }
 
-    private void handleStartFeedStatement(MetadataProvider metadataProvider, Statement stmt,
+    protected void handleStartFeedStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         StartFeedStatement sfs = (StartFeedStatement) stmt;
         SourceLocation sourceLoc = sfs.getSourceLocation();
@@ -3024,7 +3024,7 @@
                 }
                 listener = new FeedEventsListener(this, metadataProvider.getApplicationContext(), hcc, entityId,
                         datasets, null, FeedIntakeOperatorNodePushable.class.getSimpleName(),
-                        NoRetryPolicyFactory.INSTANCE, feed, feedConnections);
+                        NoRetryPolicyFactory.INSTANCE, feed, feedConnections, compilationProvider.getLanguage());
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             committed = true;
@@ -3039,7 +3039,7 @@
         }
     }
 
-    private void handleStopFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
+    protected void handleStopFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
         StopFeedStatement sfst = (StopFeedStatement) stmt;
         SourceLocation sourceLoc = sfst.getSourceLocation();
         DataverseName dataverseName = getActiveDataverseName(sfst.getDataverseName());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 20a97ed..68234b7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -29,6 +29,7 @@
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.app.result.ResponsePrinter;
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.app.translator.QueryTranslator;
@@ -450,7 +451,8 @@
 
     public static Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> buildStartFeedJob(
             MetadataProvider metadataProvider, Feed feed, List<FeedConnection> feedConnections,
-            IStatementExecutor statementExecutor, IHyracksClientConnection hcc) throws Exception {
+            IStatementExecutor statementExecutor, IHyracksClientConnection hcc, ILangExtension.Language translatorLang)
+            throws Exception {
         FeedPolicyAccessor fpa = new FeedPolicyAccessor(new HashMap<>());
         Pair<JobSpecification, ITypedAdapterFactory> intakeInfo = buildFeedIntakeJobSpec(feed, metadataProvider, fpa);
         List<JobSpecification> jobsList = new ArrayList<>();
@@ -465,8 +467,9 @@
         metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS,
                 StringUtils.join(ingestionLocations, ','));
         // TODO: Once we deprecated AQL, this extra queryTranslator can be removed.
-        IStatementExecutor translator =
-                getSQLPPTranslator(metadataProvider, ((QueryTranslator) statementExecutor).getSessionOutput());
+        IStatementExecutor translator = translatorLang == ILangExtension.Language.AQL
+                ? getSQLPPTranslator(metadataProvider, ((QueryTranslator) statementExecutor).getSessionOutput())
+                : statementExecutor;
         // Add connection job
         for (FeedConnection feedConnection : feedConnections) {
             JobSpecification connectionJob =
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index 963ba7c..b23b3cf 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -107,7 +107,7 @@
         // Add event listener
         ActiveEntityEventsListener eventsListener = new DummyFeedEventsListener(statementExecutor, appCtx, null,
                 entityId, datasetList, partitionConstraint, FeedIntakeOperatorNodePushable.class.getSimpleName(),
-                NoRetryPolicyFactory.INSTANCE, null, Collections.emptyList());
+                NoRetryPolicyFactory.INSTANCE, null, Collections.emptyList(), Language.SQLPP);
         // Register mock runtime
         NCAppRuntimeContext nc1AppCtx =
                 (NCAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0].getApplicationContext();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
index 88f1332..884d209 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
@@ -26,6 +26,7 @@
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IRetryPolicyFactory;
+import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.app.active.FeedEventsListener;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
@@ -43,9 +44,10 @@
     public DummyFeedEventsListener(IStatementExecutor statementExecutor, ICcApplicationContext appCtx,
             IHyracksClientConnection hcc, EntityId entityId, List<Dataset> datasets,
             AlgebricksAbsolutePartitionConstraint locations, String runtimeName, IRetryPolicyFactory retryPolicyFactory,
-            Feed feed, List<FeedConnection> feedConnections) throws HyracksDataException {
+            Feed feed, List<FeedConnection> feedConnections, ILangExtension.Language translatorLang)
+            throws HyracksDataException {
         super(statementExecutor, appCtx, hcc, entityId, datasets, locations, runtimeName, retryPolicyFactory, feed,
-                feedConnections);
+                feedConnections, translatorLang);
     }
 
     @Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index 5e4b5a1..2ad2f7a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -48,11 +48,11 @@
 public class LSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
     public static final String KEY_INDEX = "Index";
-    private final boolean isPrimary;
-    private final SourceLocation sourceLoc;
+    protected final boolean isPrimary;
+    protected final SourceLocation sourceLoc;
     // This class has both lsmIndex and index (in super class) pointing to the same object
     private AbstractLSMIndex lsmIndex;
-    private int i = 0;
+    protected int i = 0;
 
     /**
      * The following three variables are used to keep track of the information regarding flushing partial frame such as
@@ -64,9 +64,9 @@
      * ==> captured in currentTupleIdx variable
      * These variables are reset for each frame, i.e., whenever nextFrame() is called, these variables are reset.
      */
-    private boolean flushedPartialTuples;
-    private int currentTupleIdx;
-    private int lastFlushedTupleIdx;
+    protected boolean flushedPartialTuples;
+    protected int currentTupleIdx;
+    protected int lastFlushedTupleIdx;
 
     public LSMInsertDeleteOperatorNodePushable(IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
             RecordDescriptor inputRecDesc, IndexOperation op, boolean isPrimary,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java
index 45661e4..2226ca0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java
@@ -33,7 +33,7 @@
 public class LSMTreeInsertDeleteOperatorDescriptor extends LSMTreeIndexInsertUpdateDeleteOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
-    private final boolean isPrimary;
+    protected final boolean isPrimary;
 
     public LSMTreeInsertDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory indexHelperFactory,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 2933756..8a16014 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -131,12 +131,14 @@
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.ResultSetId;
@@ -975,10 +977,18 @@
                 fieldPermutation[i++] = idx;
             }
         }
-        return DatasetUtil.createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc, fieldPermutation,
+        return createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc, fieldPermutation,
                 context.getMissingWriterFactory());
     }
 
+    protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> createPrimaryIndexUpsertOp(JobSpecification spec,
+            MetadataProvider metadataProvider, Dataset dataset, RecordDescriptor inputRecordDesc,
+            int[] fieldPermutation, IMissingWriterFactory missingWriterFactory) throws AlgebricksException {
+        // this can be used by extensions to pick up their own operators
+        return DatasetUtil.createPrimaryIndexUpsertOp(spec, this, dataset, inputRecordDesc, fieldPermutation,
+                missingWriterFactory);
+    }
+
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
             JobSpecification jobSpec, IAType itemType, ITypedAdapterFactory adapterFactory) throws AlgebricksException {
         if (itemType.getTypeTag() != ATypeTag.OBJECT) {
@@ -1120,17 +1130,34 @@
                     pkidfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
                             primaryKeySplitsAndConstraint.first);
                 }
-                op = new LSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
+                op = createLSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
                         modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields);
 
             } else {
-                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
+                op = createLSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
                         null, true, modificationCallbackFactory);
             }
         }
         return new Pair<>(op, splitsAndConstraint.second);
     }
 
+    protected LSMPrimaryInsertOperatorDescriptor createLSMPrimaryInsertOperatorDescriptor(JobSpecification spec,
+            RecordDescriptor inputRecordDesc, int[] fieldPermutation, IIndexDataflowHelperFactory idfh,
+            IIndexDataflowHelperFactory pkidfh, IModificationOperationCallbackFactory modificationCallbackFactory,
+            ISearchOperationCallbackFactory searchCallbackFactory, int numKeys, int[] filterFields) {
+        // this can be used by extensions to pick up their own operators
+        return new LSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
+                modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields);
+    }
+
+    protected LSMTreeInsertDeleteOperatorDescriptor createLSMTreeInsertDeleteOperatorDescriptor(
+            IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int[] fieldPermutation, IndexOperation op,
+            IIndexDataflowHelperFactory indexHelperFactory, ITupleFilterFactory tupleFilterFactory, boolean isPrimary,
+            IModificationOperationCallbackFactory modCallbackFactory) {
+        return new LSMTreeInsertDeleteOperatorDescriptor(spec, outRecDesc, fieldPermutation, op, indexHelperFactory,
+                tupleFilterFactory, isPrimary, modCallbackFactory);
+    }
+
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteOrUpsertRuntime(
             IndexOperation indexOp, IDataSourceIndex<String, DataSourceId> dataSourceIndex,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
index 42b8f29..d639f3d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
@@ -34,10 +34,10 @@
 
     private static final long serialVersionUID = 1L;
 
-    private final IIndexDataflowHelperFactory keyIndexHelperFactory;
-    private final ISearchOperationCallbackFactory searchOpCallbackFactory;
-    private final int numOfPrimaryKeys;
-    private final int[] filterFields;
+    protected final IIndexDataflowHelperFactory keyIndexHelperFactory;
+    protected final ISearchOperationCallbackFactory searchOpCallbackFactory;
+    protected final int numOfPrimaryKeys;
+    protected final int[] filterFields;
 
     public LSMPrimaryInsertOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory,
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 0bb42d4..a886161 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -105,7 +105,17 @@
             }
         }
         keyTuple = new PermutingFrameTupleReference(searchKeyPermutations);
-        processor = new IFrameTupleProcessor() {
+        processor = createTupleProcessor(sourceLoc);
+        frameOpCallback = NoOpFrameOperationCallbackFactory.INSTANCE.createFrameOperationCallback(ctx, lsmAccessor);
+    }
+
+    protected void beforeModification(ITupleReference tuple) {
+        // this is used for extensions to modify tuples before persistence
+        // do nothing in the master branch
+    }
+
+    protected IFrameTupleProcessor createTupleProcessor(SourceLocation sourceLoc) {
+        return new IFrameTupleProcessor() {
             @Override
             public void process(ITupleReference tuple, int index) throws HyracksDataException {
                 if (index < currentTupleIdx) {
@@ -127,6 +137,7 @@
                     cursor.close();
                 }
                 if (!duplicate) {
+                    beforeModification(tuple);
                     lsmAccessor.forceUpsert(tuple);
                     if (lsmAccessorForKeyIndex != null) {
                         lsmAccessorForKeyIndex.forceUpsert(keyTuple);
@@ -157,8 +168,6 @@
                 // no op
             }
         };
-
-        frameOpCallback = NoOpFrameOperationCallbackFactory.INSTANCE.createFrameOperationCallback(ctx, lsmAccessor);
     }
 
     @Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
index 7587ca69..43f54f2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
@@ -36,13 +36,13 @@
 public class LSMPrimaryUpsertOperatorDescriptor extends LSMTreeInsertDeleteOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
-    private final IFrameOperationCallbackFactory frameOpCallbackFactory;
-    private final ARecordType recordType;
-    private final int filterIndex;
-    private ISearchOperationCallbackFactory searchOpCallbackFactory;
-    private final int numPrimaryKeys;
-    private final IMissingWriterFactory missingWriterFactory;
-    private final boolean hasSecondaries;
+    protected final IFrameOperationCallbackFactory frameOpCallbackFactory;
+    protected final ARecordType recordType;
+    protected final int filterIndex;
+    protected ISearchOperationCallbackFactory searchOpCallbackFactory;
+    protected final int numPrimaryKeys;
+    protected final IMissingWriterFactory missingWriterFactory;
+    protected final boolean hasSecondaries;
 
     public LSMPrimaryUpsertOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory,
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 3e43e02..316665d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -83,17 +83,17 @@
     private static final Logger LOGGER = LogManager.getLogger();
     private static final ThreadLocal<DateFormat> DATE_FORMAT =
             ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
-    private final PermutingFrameTupleReference key;
+    protected final PermutingFrameTupleReference key;
     private MultiComparator keySearchCmp;
     private ArrayTupleBuilder missingTupleBuilder;
     private final IMissingWriter missingWriter;
-    private ArrayTupleBuilder tb;
+    protected ArrayTupleBuilder tb;
     private DataOutput dos;
-    private RangePredicate searchPred;
-    private IIndexCursor cursor;
-    private ITupleReference prevTuple;
-    private final int numOfPrimaryKeys;
-    boolean isFiltered = false;
+    protected RangePredicate searchPred;
+    protected IIndexCursor cursor;
+    protected ITupleReference prevTuple;
+    protected final int numOfPrimaryKeys;
+    protected boolean isFiltered = false;
     private final ArrayTupleReference prevTupleWithFilter = new ArrayTupleReference();
     private ArrayTupleBuilder prevRecWithPKWithFilterValue;
     private ARecordType recordType;
@@ -103,13 +103,13 @@
     private final boolean hasMeta;
     private final int filterFieldIndex;
     private final int metaFieldIndex;
-    private LockThenSearchOperationCallback searchCallback;
-    private IFrameOperationCallback frameOpCallback;
+    protected LockThenSearchOperationCallback searchCallback;
+    protected IFrameOperationCallback frameOpCallback;
     private final IFrameOperationCallbackFactory frameOpCallbackFactory;
-    private AbstractIndexModificationOperationCallback abstractModCallback;
+    protected AbstractIndexModificationOperationCallback abstractModCallback;
     private final ISearchOperationCallbackFactory searchCallbackFactory;
     private final IFrameTupleProcessor processor;
-    private LSMTreeIndexAccessor lsmAccessor;
+    protected LSMTreeIndexAccessor lsmAccessor;
     private final ITracer tracer;
     private final long traceCategory;
     private long lastRecordInTimeStamp = 0L;
@@ -144,7 +144,18 @@
             this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0));
             this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
         }
-        processor = new IFrameTupleProcessor() {
+        processor = createTupleProcessor(hasSecondaries);
+        tracer = ctx.getJobletContext().getServiceContext().getTracer();
+        traceCategory = tracer.getRegistry().get(TraceUtils.LATENCY);
+    }
+
+    protected void beforeModification(ITupleReference tuple) {
+        // this is used for extensions to modify tuples before persistence
+        // do nothing in the master branch
+    }
+
+    protected IFrameTupleProcessor createTupleProcessor(final boolean hasSecondaries) {
+        return new IFrameTupleProcessor() {
             @Override
             public void process(ITupleReference tuple, int index) throws HyracksDataException {
                 try {
@@ -176,6 +187,7 @@
                         appendUpsertIndicator(!isDelete);
                         appendPreviousTupleAsMissing();
                     }
+                    beforeModification(tuple);
                     if (isDelete && prevTuple != null) {
                         // Only delete if it is a delete and not upsert
                         // And previous tuple with the same key was found
@@ -213,8 +225,6 @@
                 frameOpCallback.fail(th);
             }
         };
-        tracer = ctx.getJobletContext().getServiceContext().getTracer();
-        traceCategory = tracer.getRegistry().get(TraceUtils.LATENCY);
     }
 
     // we have the permutation which has [pk locations, record location, optional:filter-location]
@@ -285,12 +295,12 @@
         }
     }
 
-    private void resetSearchPredicate(int tupleIndex) {
+    protected void resetSearchPredicate(int tupleIndex) {
         key.reset(accessor, tupleIndex);
         searchPred.reset(key, key, true, true, keySearchCmp, keySearchCmp);
     }
 
-    private void writeOutput(int tupleIndex, boolean recordWasInserted, boolean recordWasDeleted) throws IOException {
+    protected void writeOutput(int tupleIndex, boolean recordWasInserted, boolean recordWasDeleted) throws IOException {
         if (recordWasInserted || recordWasDeleted) {
             frameTuple.reset(accessor, tupleIndex);
             for (int i = 0; i < frameTuple.getFieldCount(); i++) {
@@ -307,7 +317,7 @@
         }
     }
 
-    private static boolean isDeleteOperation(ITupleReference t1, int field) {
+    protected static boolean isDeleteOperation(ITupleReference t1, int field) {
         return TypeTagUtil.isType(t1, field, ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
     }
 
@@ -326,7 +336,7 @@
         }
     }
 
-    private void appendFilterToOutput() throws IOException {
+    protected void appendFilterToOutput() throws IOException {
         // if with filters, append the filter
         if (isFiltered) {
             dos.write(prevTuple.getFieldData(filterFieldIndex), prevTuple.getFieldStart(filterFieldIndex),
@@ -335,18 +345,18 @@
         }
     }
 
-    private void appendUpsertIndicator(boolean isUpsert) throws IOException {
+    protected void appendUpsertIndicator(boolean isUpsert) throws IOException {
         recordDesc.getFields()[0].serialize(isUpsert ? ABoolean.TRUE : ABoolean.FALSE, dos);
         tb.addFieldEndOffset();
     }
 
-    private void appendPrevRecord() throws IOException {
+    protected void appendPrevRecord() throws IOException {
         dos.write(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys),
                 prevTuple.getFieldLength(numOfPrimaryKeys));
         tb.addFieldEndOffset();
     }
 
-    private void appendPreviousMeta() throws IOException {
+    protected void appendPreviousMeta() throws IOException {
         // if has meta, then append meta
         if (hasMeta) {
             dos.write(prevTuple.getFieldData(metaFieldIndex), prevTuple.getFieldStart(metaFieldIndex),
@@ -355,7 +365,7 @@
         }
     }
 
-    private void appendPreviousTupleAsMissing() throws IOException {
+    protected void appendPreviousTupleAsMissing() throws IOException {
         prevTuple = null;
         writeMissingField();
         if (hasMeta) {
@@ -376,7 +386,7 @@
         appender.write(writer, true);
     }
 
-    private void appendFilterToPrevTuple() throws IOException {
+    protected void appendFilterToPrevTuple() throws IOException {
         if (isFiltered) {
             prevRecWithPKWithFilterValue.reset();
             for (int i = 0; i < prevTuple.getFieldCount(); i++) {