[ASTERIXDB-3383][RT] Introducing tuple filter to ingestion pipeline
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- excludes the tuple from being processed
based on the provided filter.
Change-Id: Id79607bdada1cd42949cccf43390a90dda092602
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18239
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index fc992e3..668ff13 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -52,6 +52,7 @@
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.NoOpLSMTupleFilterCallbackFactory;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.om.types.ARecordType;
@@ -843,7 +844,7 @@
frameOpCallbackFactory == null ? dataset.getFrameOpCallbackFactory(mdProvider)
: frameOpCallbackFactory,
MissingWriterFactory.INSTANCE, hasSecondaries, NoOpTupleProjectorFactory.INSTANCE,
- tuplePartitionerFactory, partitionsMap);
+ tuplePartitionerFactory, partitionsMap, NoOpLSMTupleFilterCallbackFactory.INSTANCE);
RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
filterFields == null ? 0 : filterFields.length, recordType, metaType);
// fix pk fields
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 85fbad1..9d41097 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -116,6 +116,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMPageWriteCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTupleFilterCallbackFactory;
import org.apache.hyracks.storage.common.IResourceFactory;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -877,4 +878,8 @@
public DatasetFormatInfo getDatasetFormatInfo() {
return datasetFormatInfo;
}
+
+ public ILSMTupleFilterCallbackFactory getTupleFilterCallbackFactory() {
+ return NoOpLSMTupleFilterCallbackFactory.INSTANCE;
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NoOpLSMTupleFilterCallback.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NoOpLSMTupleFilterCallback.java
new file mode 100644
index 0000000..0957a96
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NoOpLSMTupleFilterCallback.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.metadata.entities;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTupleFilterCallback;
+
+public class NoOpLSMTupleFilterCallback implements ILSMTupleFilterCallback {
+ private static final long serialVersionUID = 1L;
+ public static final NoOpLSMTupleFilterCallback INSTANCE = new NoOpLSMTupleFilterCallback();
+
+ @Override
+ public void initialize(ILSMIndex index) throws HyracksDataException {
+
+ }
+
+ @Override
+ public boolean filter(FrameTupleAccessor accessor, int tupleIdx) {
+ return false;
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NoOpLSMTupleFilterCallbackFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NoOpLSMTupleFilterCallbackFactory.java
new file mode 100644
index 0000000..7af24ac
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NoOpLSMTupleFilterCallbackFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.metadata.entities;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTupleFilterCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTupleFilterCallbackFactory;
+
+public class NoOpLSMTupleFilterCallbackFactory implements ILSMTupleFilterCallbackFactory {
+ private static final long serialVersionUID = 1L;
+ public static final NoOpLSMTupleFilterCallbackFactory INSTANCE = new NoOpLSMTupleFilterCallbackFactory();
+
+ @Override
+ public ILSMTupleFilterCallback createTupleFilterCallback(int[] fieldPermutation) {
+ return NoOpLSMTupleFilterCallback.INSTANCE;
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 43f40eb..becdbc1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -104,6 +104,7 @@
import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTupleFilterCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
import org.apache.hyracks.storage.common.IResourceFactory;
import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
@@ -532,6 +533,9 @@
}
RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits);
+ // get the Tuple filter callback
+ ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory = dataset.getTupleFilterCallbackFactory();
+
// This allows to project only the indexed fields instead of the entirety of the record
ARecordType requestedType = getPrevRecordType(metadataProvider, dataset, itemType);
ITupleProjectorFactory projectorFactory = IndexUtil.createUpsertTupleProjectorFactory(
@@ -543,7 +547,7 @@
missingWriterFactory, modificationCallbackFactory, searchCallbackFactory,
dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, filterSourceIndicator, filterItemType,
fieldIdx, hasSecondaries, projectorFactory, tuplePartitionerFactory,
- partitioningProperties.getComputeStorageMap());
+ partitioningProperties.getComputeStorageMap(), tupleFilterCallbackFactory);
return new Pair<>(op, partitioningProperties.getConstraints());
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
index ee1d388..dda8a58 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
@@ -33,6 +33,7 @@
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTupleFilterCallbackFactory;
import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
public class LSMPrimaryUpsertOperatorDescriptor extends LSMTreeInsertDeleteOperatorDescriptor {
@@ -47,6 +48,7 @@
protected final IMissingWriterFactory missingWriterFactory;
protected final boolean hasSecondaries;
private final ITupleProjectorFactory projectorFactory;
+ private final ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory;
public LSMPrimaryUpsertOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory,
@@ -55,8 +57,8 @@
ISearchOperationCallbackFactory searchOpCallbackFactory,
IFrameOperationCallbackFactory frameOpCallbackFactory, int numPrimaryKeys, Integer filterSourceIndicator,
ARecordType filterItemType, int filterIndex, boolean hasSecondaries,
- ITupleProjectorFactory projectorFactory, ITuplePartitionerFactory partitionerFactory,
- int[][] partitionsMap) {
+ ITupleProjectorFactory projectorFactory, ITuplePartitionerFactory partitionerFactory, int[][] partitionsMap,
+ ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory) {
super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT, indexHelperFactory, null, true,
modificationOpCallbackFactory, partitionerFactory, partitionsMap);
this.frameOpCallbackFactory = frameOpCallbackFactory;
@@ -68,6 +70,7 @@
this.filterIndex = filterIndex;
this.hasSecondaries = hasSecondaries;
this.projectorFactory = projectorFactory;
+ this.tupleFilterCallbackFactory = tupleFilterCallbackFactory;
}
@Override
@@ -77,6 +80,6 @@
return new LSMPrimaryUpsertOperatorNodePushable(ctx, partition, indexHelperFactory, fieldPermutation,
intputRecDesc, modCallbackFactory, searchOpCallbackFactory, numPrimaryKeys, filterSourceIndicator,
filterItemType, filterIndex, frameOpCallbackFactory, missingWriterFactory, hasSecondaries,
- projectorFactory, tuplePartitionerFactory, partitionsMap);
+ projectorFactory, tuplePartitionerFactory, partitionsMap, tupleFilterCallbackFactory);
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 34371e1..8bc2b1c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -77,6 +77,8 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTupleFilterCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTupleFilterCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
@@ -131,6 +133,7 @@
private final int metaFieldIndex;
protected final ISearchOperationCallback[] searchCallbacks;
protected final IFrameOperationCallback[] frameOpCallbacks;
+ protected final ILSMTupleFilterCallback[] tupleFilterCallbacks;
private final IFrameOperationCallbackFactory frameOpCallbackFactory;
private final ISearchOperationCallbackFactory searchCallbackFactory;
private final IFrameTupleProcessor[] processors;
@@ -140,6 +143,8 @@
private long lastRecordInTimeStamp = 0L;
private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>();
private final boolean hasSecondaries;
+ private final ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory;
+ private final int[] fieldPermutation;
public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
@@ -147,12 +152,15 @@
ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, Integer filterSourceIndicator,
ARecordType filterItemType, int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory,
IMissingWriterFactory missingWriterFactory, boolean hasSecondaries, ITupleProjectorFactory projectorFactory,
- ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) throws HyracksDataException {
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap,
+ ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory) throws HyracksDataException {
super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, IndexOperation.UPSERT,
modCallbackFactory, null, tuplePartitionerFactory, partitionsMap);
+ this.fieldPermutation = fieldPermutation;
this.hasSecondaries = hasSecondaries;
this.frameOpCallbacks = new IFrameOperationCallback[partitions.length];
this.searchCallbacks = new ISearchOperationCallback[partitions.length];
+ this.tupleFilterCallbacks = new ILSMTupleFilterCallback[partitions.length];
this.cursors = new IIndexCursor[partitions.length];
this.processors = new IFrameTupleProcessor[partitions.length];
this.key = new PermutingFrameTupleReference();
@@ -179,6 +187,7 @@
tracer = ctx.getJobletContext().getServiceContext().getTracer();
traceCategory = tracer.getRegistry().get(TraceUtils.LATENCY);
tupleProjector = projectorFactory.createTupleProjector(ctx);
+ this.tupleFilterCallbackFactory = tupleFilterCallbackFactory;
}
protected void beforeModification(ITupleReference tuple) {
@@ -305,6 +314,8 @@
indexHelpersOpen[i] = true;
indexHelper.open();
indexes[i] = indexHelper.getIndexInstance();
+ tupleFilterCallbacks[i] = tupleFilterCallbackFactory.createTupleFilterCallback(fieldPermutation);
+ tupleFilterCallbacks[i].initialize((ILSMIndex) indexes[i]);
if (((ILSMIndex) indexes[i]).isAtomic()) {
((PrimaryIndexOperationTracker) ((ILSMIndex) indexes[i]).getOperationTracker()).clear();
}
@@ -410,6 +421,9 @@
int itemCount = accessor.getTupleCount();
for (int i = 0; i < itemCount; i++) {
int storagePartition = tuplePartitioner.partition(accessor, i);
+ if (tupleFilterCallbacks[storagePartitionId2Index.get(storagePartition)].filter(accessor, i)) {
+ continue;
+ }
int pIdx = storagePartitionId2Index.get(storagePartition);
IntSet tupleIndexes = partition2TuplesMap.computeIfAbsent(pIdx, k -> new IntOpenHashSet());
tupleIndexes.add(i);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMTupleFilterCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMTupleFilterCallback.java
new file mode 100644
index 0000000..05a9e1a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMTupleFilterCallback.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public interface ILSMTupleFilterCallback extends Serializable {
+ /**
+ * Populates the state within the filter callback by fetching the state from the provided "index"
+ * and mapping it with the storage partition index, which will be utilized for filtering
+ * the tuple directed to the index "storagePartitionIdx".
+ */
+ void initialize(ILSMIndex index) throws HyracksDataException;
+
+ /**
+ * Filter the received record based on the initialized ingestion state.
+ */
+ boolean filter(FrameTupleAccessor accessor, int tupleIdx);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMTupleFilterCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMTupleFilterCallbackFactory.java
new file mode 100644
index 0000000..dc271fd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMTupleFilterCallbackFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+public interface ILSMTupleFilterCallbackFactory extends Serializable {
+ /**
+ * Creates a callback function that utilizes the field permutation of the incoming tuple.
+ * The field tuple contains information about the position of the record, meta, and primary key,
+ * facilitating the extraction of relevant information for filtering purposes.
+ */
+ ILSMTupleFilterCallback createTupleFilterCallback(int[] fieldPermutation);
+}