[ASTERIXDB-3561][EXT] Predicate pushdown for Delta Tables
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Predicates are pushed down to reduce the number of files to scan.
Ext-ref: MB-65217
Change-Id: If1a46db488ee0f26aeea27069a4668665d1781dc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19406
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
index 16aeecb..a101bed 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
@@ -32,6 +32,7 @@
import org.apache.asterix.optimizer.rules.pushdown.processor.ColumnRangeFilterPushdownProcessor;
import org.apache.asterix.optimizer.rules.pushdown.processor.ColumnValueAccessPushdownProcessor;
import org.apache.asterix.optimizer.rules.pushdown.processor.ConsolidateProjectionAndFilterExpressionsProcessor;
+import org.apache.asterix.optimizer.rules.pushdown.processor.DeltaTableFilterPushdownProcessor;
import org.apache.asterix.optimizer.rules.pushdown.processor.ExternalDatasetFilterPushdownProcessor;
import org.apache.asterix.optimizer.rules.pushdown.processor.InlineAndNormalizeFilterExpressionsProcessor;
import org.apache.asterix.optimizer.rules.pushdown.visitor.PushdownOperatorVisitor;
@@ -118,6 +119,7 @@
}
// Performs prefix pushdowns
pushdownProcessorsExecutor.add(new ExternalDatasetFilterPushdownProcessor(pushdownContext, context));
+ pushdownProcessorsExecutor.add(new DeltaTableFilterPushdownProcessor(pushdownContext, context));
pushdownProcessorsExecutor
.add(new ConsolidateProjectionAndFilterExpressionsProcessor(pushdownContext, context));
// Inlines AND/OR expression (must be last to run)
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java
new file mode 100644
index 0000000..8e5bf5b
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.processor;
+
+import static org.apache.asterix.metadata.utils.PushdownUtil.ARRAY_FUNCTIONS;
+
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class DeltaTableFilterPushdownProcessor extends ColumnFilterPushdownProcessor {
+
+ public DeltaTableFilterPushdownProcessor(PushdownContext pushdownContext, IOptimizationContext context) {
+ super(pushdownContext, context);
+ }
+
+ @Override
+ protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws AlgebricksException {
+ return !DatasetUtil.isDeltaTable(scanDefineDescriptor.getDataset());
+ }
+
+ @Override
+ protected boolean isNotPushable(AbstractFunctionCallExpression expression) {
+ FunctionIdentifier fid = expression.getFunctionIdentifier();
+ return ARRAY_FUNCTIONS.contains(fid) || super.isNotPushable(expression);
+ }
+
+ @Override
+ protected boolean handlePath(AbstractFunctionCallExpression expression) throws AlgebricksException {
+ IExpectedSchemaNode node = expression.accept(exprToNodeVisitor, null);
+ if (node == null || node.getType() != ExpectedSchemaNodeType.ANY) {
+ return false;
+ }
+
+ // The inferred path from the provided expression
+ ARecordType expressionPath = pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node);
+ paths.put(expression, expressionPath);
+ return true;
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
index c0b5e26..9a0be3f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
@@ -446,7 +446,8 @@
loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_nine", PARQUET_FILTER, "delta-data/");
loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_one/_delta_log", JSON_FILTER, "delta-data/");
loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_one", PARQUET_FILTER, "delta-data/");
-
+ loadDeltaDirectory(generatedDataBasePath, "/partitioned_delta_table", PARQUET_FILTER, "delta-data/");
+ loadDeltaDirectory(generatedDataBasePath, "/partitioned_delta_table/_delta_log", JSON_FILTER, "delta-data/");
}
private static void loadDeltaDirectory(String dataBasePath, String rootPath, FilenameFilter filter,
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
index 67d460c..7c8840a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
@@ -23,8 +23,10 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
@@ -59,6 +61,8 @@
"target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "delta_file_size_one";
public static final String DELTA_FILE_SIZE_NINE =
"target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "delta_file_size_nine";
+ public static final String DELTA_PARTITIONED_TABLE =
+ "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "partitioned_delta_table";
public static void prepareDeltaTableContainer(Configuration conf) {
File basePath = new File(".");
@@ -68,6 +72,7 @@
prepareEmptyTable(conf);
prepareFileSizeOne(conf);
prepareFileSizeNine(conf);
+ preparePartitionedTable(conf);
}
public static void cleanBinaryDirectory(File localDataRoot, String binaryFilesPath) {
@@ -352,4 +357,145 @@
throw new RuntimeException(e);
}
}
+
+ public static void preparePartitionedTable(Configuration conf) {
+ Schema schema = SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("name")
+ .requiredString("date").requiredString("hour").endRecord();
+ try {
+ List<GenericData.Record> fileFirstSnapshotRecords = List.of(new GenericData.Record(schema),
+ new GenericData.Record(schema), new GenericData.Record(schema));
+ List<GenericData.Record> fileSecondSnapshotRecords =
+ List.of(new GenericData.Record(schema), new GenericData.Record(schema));
+ List<GenericData.Record> fileThirdSnapshotRecords =
+ List.of(new GenericData.Record(schema), new GenericData.Record(schema));
+ List<GenericData.Record> fileFourthSnapshotRecords =
+ List.of(new GenericData.Record(schema), new GenericData.Record(schema));
+
+ fileFirstSnapshotRecords.get(0).put("id", 0);
+ fileFirstSnapshotRecords.get(0).put("name", "Order 1");
+ fileFirstSnapshotRecords.get(0).put("date", "01-01-2025");
+ fileFirstSnapshotRecords.get(0).put("hour", 10);
+
+ fileFirstSnapshotRecords.get(1).put("id", 1);
+ fileFirstSnapshotRecords.get(1).put("name", "Order 2");
+ fileFirstSnapshotRecords.get(1).put("date", "01-01-2025");
+ fileFirstSnapshotRecords.get(1).put("hour", 10);
+
+ fileFirstSnapshotRecords.get(2).put("id", 2);
+ fileFirstSnapshotRecords.get(2).put("name", "Order 3");
+ fileFirstSnapshotRecords.get(2).put("date", "01-01-2025");
+ fileFirstSnapshotRecords.get(2).put("hour", 10);
+
+ fileSecondSnapshotRecords.get(0).put("id", 3);
+ fileSecondSnapshotRecords.get(0).put("name", "Order 10");
+ fileSecondSnapshotRecords.get(0).put("date", "01-01-2025");
+ fileSecondSnapshotRecords.get(0).put("hour", 15);
+
+ fileSecondSnapshotRecords.get(1).put("id", 4);
+ fileSecondSnapshotRecords.get(1).put("name", "Order 11");
+ fileSecondSnapshotRecords.get(1).put("date", "01-01-2025");
+ fileSecondSnapshotRecords.get(1).put("hour", 15);
+
+ fileThirdSnapshotRecords.get(0).put("id", 5);
+ fileThirdSnapshotRecords.get(0).put("name", "Order 21");
+ fileThirdSnapshotRecords.get(0).put("date", "01-02-2025");
+ fileThirdSnapshotRecords.get(0).put("hour", 12);
+
+ fileThirdSnapshotRecords.get(1).put("id", 6);
+ fileThirdSnapshotRecords.get(1).put("name", "Order 22");
+ fileThirdSnapshotRecords.get(1).put("date", "01-02-2025");
+ fileThirdSnapshotRecords.get(1).put("hour", 12);
+
+ fileFourthSnapshotRecords.get(0).put("id", 7);
+ fileFourthSnapshotRecords.get(0).put("name", "Order 30");
+ fileFourthSnapshotRecords.get(0).put("date", "01-02-2025");
+ fileFourthSnapshotRecords.get(0).put("hour", 16);
+
+ fileFourthSnapshotRecords.get(1).put("id", 8);
+ fileFourthSnapshotRecords.get(1).put("name", "Order 31");
+ fileFourthSnapshotRecords.get(1).put("date", "01-02-2025");
+ fileFourthSnapshotRecords.get(1).put("hour", 16);
+
+ Path path = new Path(DELTA_PARTITIONED_TABLE, "firstFile.parquet");
+ ParquetWriter<GenericData.Record> writer =
+ AvroParquetWriter.<GenericData.Record> builder(path).withConf(conf).withSchema(schema).build();
+ for (GenericData.Record record : fileFirstSnapshotRecords) {
+ writer.write(record);
+ }
+ long size = writer.getDataSize();
+ writer.close();
+
+ Path path2 = new Path(DELTA_PARTITIONED_TABLE, "secondFile.parquet");
+ ParquetWriter<GenericData.Record> writer2 =
+ AvroParquetWriter.<GenericData.Record> builder(path2).withConf(conf).withSchema(schema).build();
+ for (GenericData.Record record : fileSecondSnapshotRecords) {
+ writer2.write(record);
+ }
+ long size2 = writer2.getDataSize();
+ writer2.close();
+
+ Path path3 = new Path(DELTA_PARTITIONED_TABLE, "thirdFile.parquet");
+ ParquetWriter<GenericData.Record> writer3 =
+ AvroParquetWriter.<GenericData.Record> builder(path3).withConf(conf).withSchema(schema).build();
+ for (GenericData.Record record : fileThirdSnapshotRecords) {
+ writer3.write(record);
+ }
+ long size3 = writer3.getDataSize();
+ writer3.close();
+
+ Path path4 = new Path(DELTA_PARTITIONED_TABLE, "fourthFile.parquet");
+ ParquetWriter<GenericData.Record> writer4 =
+ AvroParquetWriter.<GenericData.Record> builder(path4).withConf(conf).withSchema(schema).build();
+ for (GenericData.Record record : fileFourthSnapshotRecords) {
+ writer4.write(record);
+ }
+ long size4 = writer4.getDataSize();
+ writer4.close();
+
+ DeltaLog log = DeltaLog.forTable(conf, DELTA_PARTITIONED_TABLE);
+ OptimisticTransaction txn = log.startTransaction();
+ Metadata metaData = txn.metadata().copyBuilder().partitionColumns(Arrays.asList("date", "hour"))
+ .schema(new StructType().add(new StructField("id", new IntegerType(), true))
+ .add(new StructField("name", new StringType(), true))
+ .add(new StructField("date", new StringType(), true))
+ .add(new StructField("hour", new IntegerType(), true)))
+ .build();
+
+ Map<String, String> partitionValues = new HashMap<>();
+ partitionValues.put("date", "01-01-2025");
+ partitionValues.put("hour", "10");
+ List<Action> actions = List.of(new AddFile("firstFile.parquet", partitionValues, size,
+ System.currentTimeMillis(), true, null, null));
+ txn.updateMetadata(metaData);
+ txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create");
+
+ txn = log.startTransaction();
+ partitionValues.clear();
+ partitionValues.put("date", "01-01-2025");
+ partitionValues.put("hour", "15");
+ actions = List.of(new AddFile("secondFile.parquet", partitionValues, size2, System.currentTimeMillis(),
+ true, null, null));
+ txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create");
+
+ txn = log.startTransaction();
+ partitionValues.clear();
+ partitionValues.put("date", "01-02-2025");
+ partitionValues.put("hour", "12");
+ actions = List.of(new AddFile("thirdFile.parquet", partitionValues, size3, System.currentTimeMillis(), true,
+ null, null));
+ txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create");
+
+ txn = log.startTransaction();
+ partitionValues.clear();
+ partitionValues.put("date", "01-02-2025");
+ partitionValues.put("hour", "16");
+ actions = List.of(new AddFile("fourthFile.parquet", partitionValues, size4, System.currentTimeMillis(),
+ true, null, null));
+ txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create");
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp
new file mode 100644
index 0000000..a085d6f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter%
+ (
+ %template%,
+ ("container"="playground"),
+ ("definition"="delta-data/partitioned_delta_table"),
+ ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp
new file mode 100644
index 0000000..73ef0f9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds WHERE date = "01-01-2025" order by ds.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm
new file mode 100644
index 0000000..86d41fe
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm
@@ -0,0 +1,5 @@
+{ "id": 0, "name": "Order 1", "date": "01-01-2025", "hour": 10 }
+{ "id": 1, "name": "Order 2", "date": "01-01-2025", "hour": 10 }
+{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 }
+{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 }
+{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 1a05334..e8e89de 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -575,6 +575,12 @@
</compilation-unit>
</test-case>
<test-case FilePath="external-dataset">
+ <compilation-unit name="common/deltalake-partitioned-file-read">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">common/deltalake-partitioned-file-read</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
<compilation-unit name="common/avro/avro-types/avro-map">
<placeholder name="adapter" value="S3" />
<placeholder name="path_prefix" value="" />
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/DeltaTableFilterEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/DeltaTableFilterEvaluatorFactory.java
new file mode 100644
index 0000000..c954f9b
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/DeltaTableFilterEvaluatorFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+import io.delta.kernel.expressions.Expression;
+
+public class DeltaTableFilterEvaluatorFactory implements IExternalFilterEvaluatorFactory {
+ private static final long serialVersionUID = 1L;
+ private final Expression filterExpression;
+
+ public DeltaTableFilterEvaluatorFactory(Expression expression) {
+ this.filterExpression = expression;
+ }
+
+ @Override
+ public IExternalFilterEvaluator create(IServiceContext serviceContext, IWarningCollector warningCollector)
+ throws HyracksDataException {
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public IExternalFilterValueEmbedder createValueEmbedder(IWarningCollector warningCollector) {
+ return NoOpFilterValueEmbedder.INSTANCE;
+ }
+
+ public Expression getFilterExpression() {
+ return filterExpression;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpDeltaTableFilterEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpDeltaTableFilterEvaluatorFactory.java
new file mode 100644
index 0000000..406a630
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpDeltaTableFilterEvaluatorFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public class NoOpDeltaTableFilterEvaluatorFactory extends DeltaTableFilterEvaluatorFactory {
+ public static final IExternalFilterEvaluatorFactory INSTANCE = new NoOpDeltaTableFilterEvaluatorFactory();
+ private static final long serialVersionUID = 1L;
+
+ private NoOpDeltaTableFilterEvaluatorFactory() {
+ super(null);
+ }
+
+ @Override
+ public IExternalFilterEvaluator create(IServiceContext serviceContext, IWarningCollector warningCollector) {
+ return NoOpExternalFilterEvaluator.INSTANCE;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public IExternalFilterValueEmbedder createValueEmbedder(IWarningCollector warningCollector) {
+ return NoOpFilterValueEmbedder.INSTANCE;
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index e3313f7..3c998a5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -39,6 +39,7 @@
import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.filter.DeltaTableFilterEvaluatorFactory;
import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.HDFSUtils;
@@ -62,6 +63,8 @@
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.KernelException;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
@@ -124,19 +127,35 @@
} catch (AsterixDeltaRuntimeException e) {
throw e.getHyracksDataException();
}
- Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
+ Expression filterExpression = ((DeltaTableFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+ Scan scan;
+ if (filterExpression != null) {
+ scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema)
+ .withFilter(engine, (Predicate) filterExpression).build();
+ } else {
+ scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
+ }
scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine);
List<Row> scanFiles = new ArrayList<>();
while (iter.hasNext()) {
- FilteredColumnarBatch batch = iter.next();
+ FilteredColumnarBatch batch = null;
+ try {
+ batch = iter.next();
+ } catch (UnsupportedOperationException e) {
+ // Failed to apply expression due to type mismatch. We can skip the files where partitioned column
+ // type is different from the type of value provided in the predicate
+ LOGGER.info("Unsupported operation {}", e.getMessage());
+ continue;
+ }
CloseableIterator<Row> rowIter = batch.getRows();
while (rowIter.hasNext()) {
Row row = rowIter.next();
scanFiles.add(row);
}
}
+ LOGGER.info("Number of files to scan: {}", scanFiles.size());
locationConstraints = getPartitions(appCtx);
configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
distributeFiles(scanFiles, getPartitionConstraint().getLocations().length);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 923dbd4..7df9b47 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -744,4 +744,9 @@
return dataset.getDatasetType() == DatasetType.INTERNAL
&& dataset.getDatasetFormatInfo().getFormat() == DatasetConfig.DatasetFormat.COLUMN;
}
+
+ public static boolean isDeltaTable(Dataset dataset) {
+ return dataset.getDatasetType() == DatasetType.EXTERNAL && ExternalDataUtils
+ .isDeltaTable(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index dda2111..5939290 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.metadata.utils;
+import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
@@ -40,6 +41,7 @@
import org.apache.asterix.common.metadata.MetadataConstants;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.input.filter.NoOpDeltaTableFilterEvaluatorFactory;
import org.apache.asterix.external.input.filter.NoOpExternalFilterEvaluatorFactory;
import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.metadata.dataset.DatasetFormatInfo;
@@ -49,6 +51,7 @@
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
import org.apache.asterix.metadata.utils.filter.ColumnFilterBuilder;
import org.apache.asterix.metadata.utils.filter.ColumnRangeFilterBuilder;
+import org.apache.asterix.metadata.utils.filter.DeltaTableFilterBuilder;
import org.apache.asterix.metadata.utils.filter.ExternalFilterBuilder;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.base.IAObject;
@@ -335,16 +338,25 @@
public static IExternalFilterEvaluatorFactory createExternalFilterEvaluatorFactory(JobGenContext context,
IVariableTypeEnvironment typeEnv, IProjectionFiltrationInfo projectionFiltrationInfo,
Map<String, String> properties) throws AlgebricksException {
- if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) {
- return NoOpExternalFilterEvaluatorFactory.INSTANCE;
+ if (isDeltaTable(properties)) {
+ if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) {
+ return NoOpDeltaTableFilterEvaluatorFactory.INSTANCE;
+ } else {
+ DeltaTableFilterBuilder builder = new DeltaTableFilterBuilder(
+ (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo, context, typeEnv);
+ return builder.build();
+ }
+ } else {
+ if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) {
+ return NoOpExternalFilterEvaluatorFactory.INSTANCE;
+ } else {
+ ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
+ ExternalDatasetProjectionFiltrationInfo pfi =
+ (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo;
+ ExternalFilterBuilder build = new ExternalFilterBuilder(pfi, context, typeEnv, prefix);
+ return build.build();
+ }
}
-
- ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
- ExternalDatasetProjectionFiltrationInfo pfi =
- (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo;
- ExternalFilterBuilder build = new ExternalFilterBuilder(pfi, context, typeEnv, prefix);
-
- return build.build();
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java
index ce39220..f41a116 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java
@@ -100,7 +100,7 @@
return argsEvalFactories;
}
- private IFunctionDescriptor resolveFunction(AbstractFunctionCallExpression funcExpr) throws AlgebricksException {
+ protected IFunctionDescriptor resolveFunction(AbstractFunctionCallExpression funcExpr) throws AlgebricksException {
MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
IFunctionManager functionManager = metadataProvider.getFunctionManager();
FunctionIdentifier fnId = funcExpr.getFunctionIdentifier();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java
new file mode 100644
index 0000000..73ed81e
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils.filter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.input.filter.DeltaTableFilterEvaluatorFactory;
+import org.apache.asterix.om.base.ADate;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AInt16;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AInt8;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.logging.log4j.LogManager;
+
+import com.microsoft.azure.storage.core.Logger;
+
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Predicate;
+
+public class DeltaTableFilterBuilder extends AbstractFilterBuilder {
+
+ private static final org.apache.logging.log4j.Logger LOGGER = LogManager.getLogger();
+
+ public DeltaTableFilterBuilder(ExternalDatasetProjectionFiltrationInfo projectionFiltrationInfo,
+ JobGenContext context, IVariableTypeEnvironment typeEnv) {
+ super(projectionFiltrationInfo.getFilterPaths(), projectionFiltrationInfo.getFilterExpression(), context,
+ typeEnv);
+ }
+
+ public IExternalFilterEvaluatorFactory build() throws AlgebricksException {
+ Expression deltaTablePredicate = null;
+ if (filterExpression != null) {
+ try {
+ deltaTablePredicate = createExpression(filterExpression);
+ } catch (Exception e) {
+ LOGGER.error("Error creating DeltaTable filter expression ", e);
+ }
+ }
+ if (deltaTablePredicate != null && !(deltaTablePredicate instanceof Predicate)) {
+ deltaTablePredicate = null;
+ }
+ return new DeltaTableFilterEvaluatorFactory(deltaTablePredicate);
+ }
+
+ protected Expression createExpression(ILogicalExpression expression) throws AlgebricksException {
+ if (filterPaths.containsKey(expression)) {
+ // Path expression, create a value accessor (i.e., a column reader)
+ return createColumnExpression(expression);
+ } else if (expression.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
+ return createLiteralExpression(expression);
+ } else if (expression.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+ return handleFunction(expression);
+ }
+
+ /*
+ * A variable expression: This should not happen as the provided filter expression is inlined.
+ * If a variable was encountered for some reason, it should only be the record variable. If the record variable
+ * was encountered, that means there's a missing value path the compiler didn't provide.
+ */
+ throw new RuntimeException("Unsupported expression " + expression + ". the provided paths are: " + filterPaths);
+ }
+
+ private Expression createLiteralExpression(ILogicalExpression expression) throws AlgebricksException {
+ ConstantExpression constExpr = (ConstantExpression) expression;
+ if (constExpr.getValue().isNull() || constExpr.getValue().isMissing()) {
+ throw new RuntimeException("Unsupported literal type: " + constExpr.getValue());
+ }
+ AsterixConstantValue constantValue = (AsterixConstantValue) constExpr.getValue();
+ switch (constantValue.getObject().getType().getTypeTag()) {
+ case STRING:
+ return Literal.ofString(((AString) constantValue.getObject()).getStringValue());
+ case TINYINT:
+ return Literal.ofByte(((AInt8) constantValue.getObject()).getByteValue());
+ case SMALLINT:
+ return Literal.ofShort(((AInt16) constantValue.getObject()).getShortValue());
+ case INTEGER:
+ return Literal.ofInt(((AInt32) constantValue.getObject()).getIntegerValue());
+ case BOOLEAN:
+ return Literal.ofBoolean(constantValue.isTrue());
+ case BIGINT:
+ return Literal.ofLong(((AInt64) constantValue.getObject()).getLongValue());
+ case DOUBLE:
+ return Literal.ofDouble(((ADouble) constantValue.getObject()).getDoubleValue());
+ case DATE:
+ return Literal.ofDate(((ADate) constantValue.getObject()).getChrononTimeInDays());
+ case DATETIME:
+ Long millis = ((ADateTime) constantValue.getObject()).getChrononTime();
+ return Literal.ofTimestamp(TimeUnit.MILLISECONDS.toMicros(millis));
+ default:
+ throw new RuntimeException("Unsupported literal type: " + constantValue.getObject().getType());
+ }
+ }
+
+ @Override
+ protected IScalarEvaluatorFactory createValueAccessor(ILogicalExpression expression) {
+ return null;
+ }
+
+ private Expression handleFunction(ILogicalExpression expr) throws AlgebricksException {
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+ IFunctionDescriptor fd = resolveFunction(funcExpr);
+ List<Expression> args = handleArgs(funcExpr);
+ FunctionIdentifier fid = fd.getIdentifier();
+ if (fid.equals(AlgebricksBuiltinFunctions.AND)) {
+ return new Predicate("AND", args);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.OR)) {
+ return new Predicate("OR", args);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.EQ)) {
+ return new Predicate("=", args);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.GE)) {
+ return new Predicate(">=", args);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.GT)) {
+ return new Predicate(">", args);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.LE)) {
+ return new Predicate("<=", args);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.LT)) {
+ return new Predicate("<", args);
+ } else {
+ throw new RuntimeException("Unsupported function: " + funcExpr);
+ }
+ }
+
+ private List<Expression> handleArgs(AbstractFunctionCallExpression funcExpr) throws AlgebricksException {
+ List<Mutable<ILogicalExpression>> args = funcExpr.getArguments();
+ List<Expression> argsExpressions = new ArrayList<>();
+ for (int i = 0; i < args.size(); i++) {
+ ILogicalExpression expr = args.get(i).getValue();
+ Expression evalFactory = createExpression(expr);
+ argsExpressions.add(evalFactory);
+ }
+ return argsExpressions;
+ }
+
+ protected Column createColumnExpression(ILogicalExpression expression) {
+ ARecordType path = filterPaths.get(expression);
+ if (path.getFieldNames().length != 1) {
+ throw new RuntimeException("Unsupported expression: " + expression);
+ }
+ return new Column(path.getFieldNames()[0]);
+ }
+}