[ASTERIXDB-3383][RT] Introducing tuple filter to ingestion pipeline

   - user model changes: no
   - storage format changes: no
   - interface changes: no

   Details:

   - excludes the tuple from being processed
     based on the provided filter.

Change-Id: Id79607bdada1cd42949cccf43390a90dda092602
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18239
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index fc992e3..668ff13 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -52,6 +52,7 @@
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.NoOpLSMTupleFilterCallbackFactory;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.types.ARecordType;
@@ -843,7 +844,7 @@
                         frameOpCallbackFactory == null ? dataset.getFrameOpCallbackFactory(mdProvider)
                                 : frameOpCallbackFactory,
                         MissingWriterFactory.INSTANCE, hasSecondaries, NoOpTupleProjectorFactory.INSTANCE,
-                        tuplePartitionerFactory, partitionsMap);
+                        tuplePartitionerFactory, partitionsMap, NoOpLSMTupleFilterCallbackFactory.INSTANCE);
         RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
                 filterFields == null ? 0 : filterFields.length, recordType, metaType);
         // fix pk fields
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 85fbad1..9d41097 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -116,6 +116,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMPageWriteCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTupleFilterCallbackFactory;
 import org.apache.hyracks.storage.common.IResourceFactory;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
@@ -877,4 +878,8 @@
     public DatasetFormatInfo getDatasetFormatInfo() {
         return datasetFormatInfo;
     }
+
+    public ILSMTupleFilterCallbackFactory getTupleFilterCallbackFactory() {
+        return NoOpLSMTupleFilterCallbackFactory.INSTANCE;
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NoOpLSMTupleFilterCallback.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NoOpLSMTupleFilterCallback.java
new file mode 100644
index 0000000..0957a96
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NoOpLSMTupleFilterCallback.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.metadata.entities;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTupleFilterCallback;
+
+public class NoOpLSMTupleFilterCallback implements ILSMTupleFilterCallback {
+    private static final long serialVersionUID = 1L;
+    public static final NoOpLSMTupleFilterCallback INSTANCE = new NoOpLSMTupleFilterCallback();
+
+    @Override
+    public void initialize(ILSMIndex index) throws HyracksDataException {
+
+    }
+
+    @Override
+    public boolean filter(FrameTupleAccessor accessor, int tupleIdx) {
+        return false;
+    }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NoOpLSMTupleFilterCallbackFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NoOpLSMTupleFilterCallbackFactory.java
new file mode 100644
index 0000000..7af24ac
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NoOpLSMTupleFilterCallbackFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.metadata.entities;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTupleFilterCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTupleFilterCallbackFactory;
+
+public class NoOpLSMTupleFilterCallbackFactory implements ILSMTupleFilterCallbackFactory {
+    private static final long serialVersionUID = 1L;
+    public static final NoOpLSMTupleFilterCallbackFactory INSTANCE = new NoOpLSMTupleFilterCallbackFactory();
+
+    @Override
+    public ILSMTupleFilterCallback createTupleFilterCallback(int[] fieldPermutation) {
+        return NoOpLSMTupleFilterCallback.INSTANCE;
+    }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 43f40eb..becdbc1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -104,6 +104,7 @@
 import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTupleFilterCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
 import org.apache.hyracks.storage.common.IResourceFactory;
 import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
@@ -532,6 +533,9 @@
         }
         RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits);
 
+        // get the Tuple filter callback
+        ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory = dataset.getTupleFilterCallbackFactory();
+
         // This allows to project only the indexed fields instead of the entirety of the record
         ARecordType requestedType = getPrevRecordType(metadataProvider, dataset, itemType);
         ITupleProjectorFactory projectorFactory = IndexUtil.createUpsertTupleProjectorFactory(
@@ -543,7 +547,7 @@
                 missingWriterFactory, modificationCallbackFactory, searchCallbackFactory,
                 dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, filterSourceIndicator, filterItemType,
                 fieldIdx, hasSecondaries, projectorFactory, tuplePartitionerFactory,
-                partitioningProperties.getComputeStorageMap());
+                partitioningProperties.getComputeStorageMap(), tupleFilterCallbackFactory);
         return new Pair<>(op, partitioningProperties.getConstraints());
     }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
index ee1d388..dda8a58 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
@@ -33,6 +33,7 @@
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTupleFilterCallbackFactory;
 import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
 
 public class LSMPrimaryUpsertOperatorDescriptor extends LSMTreeInsertDeleteOperatorDescriptor {
@@ -47,6 +48,7 @@
     protected final IMissingWriterFactory missingWriterFactory;
     protected final boolean hasSecondaries;
     private final ITupleProjectorFactory projectorFactory;
+    private final ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory;
 
     public LSMPrimaryUpsertOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory,
@@ -55,8 +57,8 @@
             ISearchOperationCallbackFactory searchOpCallbackFactory,
             IFrameOperationCallbackFactory frameOpCallbackFactory, int numPrimaryKeys, Integer filterSourceIndicator,
             ARecordType filterItemType, int filterIndex, boolean hasSecondaries,
-            ITupleProjectorFactory projectorFactory, ITuplePartitionerFactory partitionerFactory,
-            int[][] partitionsMap) {
+            ITupleProjectorFactory projectorFactory, ITuplePartitionerFactory partitionerFactory, int[][] partitionsMap,
+            ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory) {
         super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT, indexHelperFactory, null, true,
                 modificationOpCallbackFactory, partitionerFactory, partitionsMap);
         this.frameOpCallbackFactory = frameOpCallbackFactory;
@@ -68,6 +70,7 @@
         this.filterIndex = filterIndex;
         this.hasSecondaries = hasSecondaries;
         this.projectorFactory = projectorFactory;
+        this.tupleFilterCallbackFactory = tupleFilterCallbackFactory;
     }
 
     @Override
@@ -77,6 +80,6 @@
         return new LSMPrimaryUpsertOperatorNodePushable(ctx, partition, indexHelperFactory, fieldPermutation,
                 intputRecDesc, modCallbackFactory, searchOpCallbackFactory, numPrimaryKeys, filterSourceIndicator,
                 filterItemType, filterIndex, frameOpCallbackFactory, missingWriterFactory, hasSecondaries,
-                projectorFactory, tuplePartitionerFactory, partitionsMap);
+                projectorFactory, tuplePartitionerFactory, partitionsMap, tupleFilterCallbackFactory);
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 34371e1..8bc2b1c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -77,6 +77,8 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTupleFilterCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTupleFilterCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
@@ -131,6 +133,7 @@
     private final int metaFieldIndex;
     protected final ISearchOperationCallback[] searchCallbacks;
     protected final IFrameOperationCallback[] frameOpCallbacks;
+    protected final ILSMTupleFilterCallback[] tupleFilterCallbacks;
     private final IFrameOperationCallbackFactory frameOpCallbackFactory;
     private final ISearchOperationCallbackFactory searchCallbackFactory;
     private final IFrameTupleProcessor[] processors;
@@ -140,6 +143,8 @@
     private long lastRecordInTimeStamp = 0L;
     private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>();
     private final boolean hasSecondaries;
+    private final ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory;
+    private final int[] fieldPermutation;
 
     public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
@@ -147,12 +152,15 @@
             ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, Integer filterSourceIndicator,
             ARecordType filterItemType, int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory,
             IMissingWriterFactory missingWriterFactory, boolean hasSecondaries, ITupleProjectorFactory projectorFactory,
-            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) throws HyracksDataException {
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap,
+            ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory) throws HyracksDataException {
         super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, IndexOperation.UPSERT,
                 modCallbackFactory, null, tuplePartitionerFactory, partitionsMap);
+        this.fieldPermutation = fieldPermutation;
         this.hasSecondaries = hasSecondaries;
         this.frameOpCallbacks = new IFrameOperationCallback[partitions.length];
         this.searchCallbacks = new ISearchOperationCallback[partitions.length];
+        this.tupleFilterCallbacks = new ILSMTupleFilterCallback[partitions.length];
         this.cursors = new IIndexCursor[partitions.length];
         this.processors = new IFrameTupleProcessor[partitions.length];
         this.key = new PermutingFrameTupleReference();
@@ -179,6 +187,7 @@
         tracer = ctx.getJobletContext().getServiceContext().getTracer();
         traceCategory = tracer.getRegistry().get(TraceUtils.LATENCY);
         tupleProjector = projectorFactory.createTupleProjector(ctx);
+        this.tupleFilterCallbackFactory = tupleFilterCallbackFactory;
     }
 
     protected void beforeModification(ITupleReference tuple) {
@@ -305,6 +314,8 @@
                 indexHelpersOpen[i] = true;
                 indexHelper.open();
                 indexes[i] = indexHelper.getIndexInstance();
+                tupleFilterCallbacks[i] = tupleFilterCallbackFactory.createTupleFilterCallback(fieldPermutation);
+                tupleFilterCallbacks[i].initialize((ILSMIndex) indexes[i]);
                 if (((ILSMIndex) indexes[i]).isAtomic()) {
                     ((PrimaryIndexOperationTracker) ((ILSMIndex) indexes[i]).getOperationTracker()).clear();
                 }
@@ -410,6 +421,9 @@
         int itemCount = accessor.getTupleCount();
         for (int i = 0; i < itemCount; i++) {
             int storagePartition = tuplePartitioner.partition(accessor, i);
+            if (tupleFilterCallbacks[storagePartitionId2Index.get(storagePartition)].filter(accessor, i)) {
+                continue;
+            }
             int pIdx = storagePartitionId2Index.get(storagePartition);
             IntSet tupleIndexes = partition2TuplesMap.computeIfAbsent(pIdx, k -> new IntOpenHashSet());
             tupleIndexes.add(i);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMTupleFilterCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMTupleFilterCallback.java
new file mode 100644
index 0000000..05a9e1a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMTupleFilterCallback.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public interface ILSMTupleFilterCallback extends Serializable {
+    /**
+     * Populates the state within the filter callback by fetching the state from the provided "index"
+     * and mapping it with the storage partition index, which will be utilized for filtering
+     * the tuple directed to the index "storagePartitionIdx".
+     */
+    void initialize(ILSMIndex index) throws HyracksDataException;
+
+    /**
+     * Filter the received record based on the initialized ingestion state.
+     */
+    boolean filter(FrameTupleAccessor accessor, int tupleIdx);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMTupleFilterCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMTupleFilterCallbackFactory.java
new file mode 100644
index 0000000..dc271fd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMTupleFilterCallbackFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+public interface ILSMTupleFilterCallbackFactory extends Serializable {
+    /**
+     * Creates a callback function that utilizes the field permutation of the incoming tuple.
+     * The field tuple contains information about the position of the record, meta, and primary key,
+     * facilitating the extraction of relevant information for filtering purposes.
+     */
+    ILSMTupleFilterCallback createTupleFilterCallback(int[] fieldPermutation);
+}