[ASTERIXDB-3575][EXT] Pushdown predicates for Parquet external datasets to filter row groups
- user model changes: no
- storage format changes: no
- interface changes: yes
Ext-ref: MB-65316
Change-Id: I2c3214e2a351252fb1929aa1562cbab2d67fa9aa
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19633
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
index a101bed..bd9e329 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
@@ -35,6 +35,7 @@
import org.apache.asterix.optimizer.rules.pushdown.processor.DeltaTableFilterPushdownProcessor;
import org.apache.asterix.optimizer.rules.pushdown.processor.ExternalDatasetFilterPushdownProcessor;
import org.apache.asterix.optimizer.rules.pushdown.processor.InlineAndNormalizeFilterExpressionsProcessor;
+import org.apache.asterix.optimizer.rules.pushdown.processor.ParquetFilterPushdownProcessor;
import org.apache.asterix.optimizer.rules.pushdown.visitor.PushdownOperatorVisitor;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -120,6 +121,7 @@
// Performs prefix pushdowns
pushdownProcessorsExecutor.add(new ExternalDatasetFilterPushdownProcessor(pushdownContext, context));
pushdownProcessorsExecutor.add(new DeltaTableFilterPushdownProcessor(pushdownContext, context));
+ pushdownProcessorsExecutor.add(new ParquetFilterPushdownProcessor(pushdownContext, context));
pushdownProcessorsExecutor
.add(new ConsolidateProjectionAndFilterExpressionsProcessor(pushdownContext, context));
// Inlines AND/OR expression (must be last to run)
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
index 1622c6e..c4210a3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
@@ -26,7 +26,9 @@
import java.util.Set;
import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.optimizer.rules.pushdown.descriptor.DefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ParquetDatasetScanDefineDescriptor;
import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
import org.apache.asterix.optimizer.rules.pushdown.descriptor.UseDescriptor;
import org.apache.asterix.optimizer.rules.pushdown.visitor.FilterExpressionInlineVisitor;
@@ -84,8 +86,15 @@
public void registerScan(Dataset dataset, List<LogicalVariable> pkList, LogicalVariable recordVariable,
LogicalVariable metaVariable, AbstractScanOperator scanOperator) {
- ScanDefineDescriptor scanDefDesc =
- new ScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, metaVariable, scanOperator);
+ ScanDefineDescriptor scanDefDesc;
+ if (DatasetUtil.isParquetFormat(dataset)) {
+ scanDefDesc = new ParquetDatasetScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable,
+ metaVariable, scanOperator);
+ } else {
+ scanDefDesc = new ScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, metaVariable,
+ scanOperator);
+ }
+ new ScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, metaVariable, scanOperator);
defineChain.put(recordVariable, scanDefDesc);
useChain.put(recordVariable, new ArrayList<>());
if (metaVariable != null) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
index 023e4da..cf438d8 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
@@ -31,12 +31,14 @@
import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ParquetDatasetScanDefineDescriptor;
import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
import org.apache.asterix.optimizer.rules.pushdown.processor.IPushdownProcessor;
import org.apache.asterix.optimizer.rules.pushdown.visitor.ExpectedSchemaNodeToIATypeTranslatorVisitor;
import org.apache.asterix.runtime.projection.ColumnDatasetProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.asterix.runtime.projection.ParquetExternalDatasetProjectionFiltrationInfo;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -121,6 +123,12 @@
Map<String, String> configuration = ((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties();
boolean embedFilterValues = ExternalDataPrefix.containsComputedFields(configuration) && Boolean.parseBoolean(
configuration.getOrDefault(ExternalDataConstants.KEY_EMBED_FILTER_VALUES, ExternalDataConstants.TRUE));
+ if (DatasetUtil.isParquetFormat(dataset)) {
+ return new ParquetExternalDatasetProjectionFiltrationInfo(recordRequestedType, pathLocations,
+ scanDefineDescriptor.getFilterPaths(), scanDefineDescriptor.getFilterExpression(),
+ ((ParquetDatasetScanDefineDescriptor) scanDefineDescriptor).getRowGroupFilterExpression(),
+ embedFilterValues);
+ }
return new ExternalDatasetProjectionFiltrationInfo(recordRequestedType, pathLocations,
scanDefineDescriptor.getFilterPaths(), scanDefineDescriptor.getFilterExpression(), embedFilterValues);
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ParquetDatasetScanDefineDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ParquetDatasetScanDefineDescriptor.java
new file mode 100644
index 0000000..4b4d742
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ParquetDatasetScanDefineDescriptor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.descriptor;
+
+import java.util.List;
+
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+
+public class ParquetDatasetScanDefineDescriptor extends ScanDefineDescriptor {
+
+ private ILogicalExpression rowGroupFilterExpression;
+
+ public ParquetDatasetScanDefineDescriptor(int scope, Dataset dataset, List<LogicalVariable> primaryKeyVariables,
+ LogicalVariable recordVariable, LogicalVariable metaRecordVariable, ILogicalOperator operator) {
+ super(scope, dataset, primaryKeyVariables, recordVariable, metaRecordVariable, operator);
+ this.rowGroupFilterExpression = null;
+ }
+
+ public ILogicalExpression getRowGroupFilterExpression() {
+ return rowGroupFilterExpression;
+ }
+
+ public void setRowGroupFilterExpression(ILogicalExpression rowGroupFilterExpression) {
+ this.rowGroupFilterExpression = rowGroupFilterExpression;
+ }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
index 1ec64d5..0d79767 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
@@ -61,7 +61,7 @@
protected final ExpressionToExpectedSchemaNodeVisitor exprToNodeVisitor;
protected final ColumnFilterPathBuilderVisitor pathBuilderVisitor;
protected final Map<ILogicalExpression, ARecordType> paths;
- private final ArrayPathCheckerVisitor checkerVisitor;
+ protected final ArrayPathCheckerVisitor checkerVisitor;
public ColumnFilterPushdownProcessor(PushdownContext pushdownContext, IOptimizationContext context) {
super(pushdownContext, context);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java
new file mode 100644
index 0000000..25db9f9
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.processor;
+
+import static org.apache.asterix.metadata.utils.PushdownUtil.RANGE_FILTER_PUSHABLE_FUNCTIONS;
+
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ParquetDatasetScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ParquetFilterPushdownProcessor extends ColumnFilterPushdownProcessor {
+
+ public ParquetFilterPushdownProcessor(PushdownContext pushdownContext, IOptimizationContext context) {
+ super(pushdownContext, context);
+ }
+
+ @Override
+ protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws AlgebricksException {
+ return !DatasetUtil.isParquetFormat(scanDefineDescriptor.getDataset());
+ }
+
+ @Override
+ protected boolean isNotPushable(AbstractFunctionCallExpression expression) {
+ FunctionIdentifier fid = expression.getFunctionIdentifier();
+ return !RANGE_FILTER_PUSHABLE_FUNCTIONS.contains(expression.getFunctionIdentifier());
+ }
+
+ @Override
+ protected boolean handlePath(AbstractFunctionCallExpression expression) throws AlgebricksException {
+ IExpectedSchemaNode node = expression.accept(exprToNodeVisitor, null);
+ if (node == null || node.getType() != ExpectedSchemaNodeType.ANY) {
+ return false;
+ }
+
+ // The inferred path from the provided expression
+ ARecordType expressionPath = pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node);
+ paths.put(expression, expressionPath);
+ return true;
+ }
+
+ @Override
+ protected void putFilterInformation(ScanDefineDescriptor scanDefineDescriptor, ILogicalExpression inlinedExpr)
+ throws AlgebricksException {
+ if (checkerVisitor.containsMultipleArrayPaths(paths.values())) {
+ // Cannot pushdown a filter with multiple unnest
+ // TODO allow rewindable column readers for filters
+ // TODO this is a bit conservative (maybe too conservative) as we can push part of expression down
+ return;
+ }
+ ParquetDatasetScanDefineDescriptor scanDefDesc = (ParquetDatasetScanDefineDescriptor) scanDefineDescriptor;
+ ILogicalExpression filterExpr = scanDefDesc.getRowGroupFilterExpression();
+ if (filterExpr != null) {
+ filterExpr = andExpression(filterExpr, inlinedExpr);
+ scanDefDesc.setRowGroupFilterExpression(filterExpr);
+ } else {
+ scanDefDesc.setRowGroupFilterExpression(inlinedExpr);
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan
index b560a5b..805b9aa 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan
@@ -16,7 +16,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true row-group-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan
index 2bfad06..a57424c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan
@@ -16,7 +16,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true row-group-filter on: and(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan
index afb74f1..5e23176 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan
@@ -16,7 +16,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$d] <- test.Department embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$d] <- test.Department embed-filter-value: true row-group-filter on: or(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan
index fc48056..5832f4f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan
@@ -16,7 +16,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") embed-filter-value: true row-group-filter on: eq($$d.getField("name").getField("last"), "Jones") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan
index 8dcaa95..ecbd5ee 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan
@@ -16,7 +16,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") row-group-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan
index a99cc0c..35da948 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan
@@ -16,7 +16,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") row-group-filter on: and(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan
index a7c5727..23892f1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan
@@ -16,7 +16,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$d] <- test.Department [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$d] <- test.Department row-group-filter on: or(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan
index b48def6..9fb8ee1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan
@@ -16,7 +16,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") row-group-filter on: eq($$d.getField("name").getField("last"), "Jones") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan
index 2b41307..9dab9c1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan
@@ -14,7 +14,7 @@
-- STREAM_SELECT |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$p1] <- test.ParquetDataset1 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$p1] <- test.ParquetDataset1 row-group-filter on: gt($$p1.getField("id"), 10) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan
index 3993a4d..2fca35d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan
@@ -36,7 +36,7 @@
-- STREAM_SELECT |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:any},id:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:any},id:any}) row-group-filter on: gt($$p1.getField("id"), 10) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan
index 9b93e19..945f2da 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan
@@ -36,7 +36,7 @@
-- STREAM_SELECT |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:[{indices:any,text:any}]},id:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:[{indices:any,text:any}]},id:any}) row-group-filter on: gt($$p1.getField("id"), 10) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan
index cccf9a3..6941720 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan
@@ -44,7 +44,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any}]}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any}]}) row-group-filter on: or(eq(scan-collection($$p.getField("val1")).getField("x"), 1), eq(scan-collection($$p.getField("val1")).getField("x"), 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan
index cf265bf..316e465 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan
@@ -44,7 +44,7 @@
-- ASSIGN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any,y:any}]}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any,y:any}]}) row-group-filter on: or(eq(scan-collection($$p.getField("val1")).getField("x"), 1), eq(scan-collection($$p.getField("val1")).getField("y"), 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ParquetFilterEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ParquetFilterEvaluatorFactory.java
new file mode 100644
index 0000000..774e908
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ParquetFilterEvaluatorFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+
+public class ParquetFilterEvaluatorFactory implements IExternalFilterEvaluatorFactory {
+ private static final long serialVersionUID = 1L;
+ private final FilterPredicate filterExpression;
+ private final IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory;
+
+ public ParquetFilterEvaluatorFactory(IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory,
+ FilterPredicate expression) {
+ this.externalFilterEvaluatorFactory = externalFilterEvaluatorFactory;
+ this.filterExpression = expression;
+ }
+
+ @Override
+ public IExternalFilterEvaluator create(IServiceContext serviceContext, IWarningCollector warningCollector)
+ throws HyracksDataException {
+ return externalFilterEvaluatorFactory.create(serviceContext, warningCollector);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public IExternalFilterValueEmbedder createValueEmbedder(IWarningCollector warningCollector) {
+ return externalFilterEvaluatorFactory.createValueEmbedder(warningCollector);
+ }
+
+ public FilterPredicate getFilterExpression() {
+ return filterExpression;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 2d92e10..f5dc7a2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -33,6 +33,7 @@
import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataPrefix;
@@ -44,6 +45,8 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
import com.amazonaws.SdkBaseException;
@@ -91,6 +94,13 @@
IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
configureAwsS3HdfsJobConf(appCtx, conf, configuration, numberOfPartitions);
configureHdfsConf(conf, configuration);
+ if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) {
+ FilterPredicate parquetFilterPredicate =
+ ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+ if (parquetFilterPredicate != null) {
+ ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate);
+ }
+ }
} catch (SdkException | SdkBaseException ex) {
throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
} catch (AlgebricksException ex) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index 530ce74..72c9977 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -32,6 +32,7 @@
import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataPrefix;
@@ -41,6 +42,8 @@
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobItem;
@@ -82,6 +85,13 @@
JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
configureAzureHdfsJobConf(conf, configuration, endPoint);
configureHdfsConf(conf, configuration);
+ if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) {
+ FilterPredicate parquetFilterPredicate =
+ ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+ if (parquetFilterPredicate != null) {
+ ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate);
+ }
+ }
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
index 4dedb08..1db4445 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
@@ -32,6 +32,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -40,6 +41,8 @@
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.models.PathItem;
@@ -69,6 +72,13 @@
JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
configureAzureHdfsJobConf(conf, configuration, endPoint);
configureHdfsConf(conf, configuration);
+ if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) {
+ FilterPredicate parquetFilterPredicate =
+ ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+ if (parquetFilterPredicate != null) {
+ ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate);
+ }
+ }
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
index 874c3bd..9469747 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataPrefix;
@@ -39,6 +40,8 @@
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
import com.google.cloud.storage.Blob;
@@ -76,6 +79,13 @@
int numberOfPartitions = getPartitionConstraint().getLocations().length;
GCSAuthUtils.configureHdfsJobConf(conf, configuration, numberOfPartitions);
configureHdfsConf(conf, configuration);
+ if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) {
+ FilterPredicate parquetFilterPredicate =
+ ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+ if (parquetFilterPredicate != null) {
+ ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate);
+ }
+ }
}
@Override
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 2551b86..0cd150e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -854,4 +854,9 @@
return dataset.getDatasetType() == DatasetType.EXTERNAL && ExternalDataUtils
.isDeltaTable(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
}
+
+ public static boolean isParquetFormat(Dataset dataset) {
+ return dataset.getDatasetType() == DatasetType.EXTERNAL && ExternalDataUtils
+ .isParquetFormat(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 5939290..dfefb8f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -19,6 +19,7 @@
package org.apache.asterix.metadata.utils;
import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static org.apache.asterix.external.util.ExternalDataUtils.isParquetFormat;
import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
@@ -43,6 +44,7 @@
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.input.filter.NoOpDeltaTableFilterEvaluatorFactory;
import org.apache.asterix.external.input.filter.NoOpExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.metadata.dataset.DatasetFormatInfo;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -53,6 +55,7 @@
import org.apache.asterix.metadata.utils.filter.ColumnRangeFilterBuilder;
import org.apache.asterix.metadata.utils.filter.DeltaTableFilterBuilder;
import org.apache.asterix.metadata.utils.filter.ExternalFilterBuilder;
+import org.apache.asterix.metadata.utils.filter.ParquetFilterBuilder;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.base.IAObject;
import org.apache.asterix.om.types.ARecordType;
@@ -61,6 +64,7 @@
import org.apache.asterix.runtime.projection.ColumnDatasetProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.asterix.runtime.projection.ParquetExternalDatasetProjectionFiltrationInfo;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -346,6 +350,25 @@
(ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo, context, typeEnv);
return builder.build();
}
+ } else if (isParquetFormat(properties)) {
+ if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) {
+ return NoOpDeltaTableFilterEvaluatorFactory.INSTANCE;
+ } else {
+ ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
+ ExternalDatasetProjectionFiltrationInfo pfi =
+ (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo;
+ IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory =
+ NoOpExternalFilterEvaluatorFactory.INSTANCE;
+ if (!prefix.getPaths().isEmpty()) {
+ ExternalFilterBuilder externalFilterBuilder =
+ new ExternalFilterBuilder(pfi, context, typeEnv, prefix);
+ externalFilterEvaluatorFactory = externalFilterBuilder.build();
+ }
+ ParquetFilterBuilder builder = new ParquetFilterBuilder(
+ (ParquetExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo, context, typeEnv);
+ return new ParquetFilterEvaluatorFactory(externalFilterEvaluatorFactory,
+ builder.buildFilterPredicate());
+ }
} else {
if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) {
return NoOpExternalFilterEvaluatorFactory.INSTANCE;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ParquetFilterBuilder.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ParquetFilterBuilder.java
new file mode 100644
index 0000000..38ad119
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ParquetFilterBuilder.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils.filter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.om.base.ADate;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AInt16;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AInt8;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.projection.ParquetExternalDatasetProjectionFiltrationInfo;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators;
+import org.apache.parquet.io.api.Binary;
+
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Predicate;
+
+public class ParquetFilterBuilder extends AbstractFilterBuilder {
+
+ private static final org.apache.logging.log4j.Logger LOGGER = LogManager.getLogger();
+
+ public ParquetFilterBuilder(ParquetExternalDatasetProjectionFiltrationInfo projectionFiltrationInfo,
+ JobGenContext context, IVariableTypeEnvironment typeEnv) {
+ super(projectionFiltrationInfo.getFilterPaths(), projectionFiltrationInfo.getParquetRowGroupFilterExpression(),
+ context, typeEnv);
+ }
+
+ public FilterPredicate buildFilterPredicate() throws AlgebricksException {
+ FilterPredicate parquetFilterPredicate = null;
+ if (filterExpression != null) {
+ try {
+ parquetFilterPredicate = createFilterExpression(filterExpression);
+ } catch (Exception e) {
+ LOGGER.error("Error creating Parquet row-group filter expression ", e);
+ }
+ }
+ if (parquetFilterPredicate != null && !(parquetFilterPredicate instanceof Predicate)) {
+ parquetFilterPredicate = null;
+ }
+ return parquetFilterPredicate;
+ }
+
+ private FilterPredicate createComparisonExpression(ILogicalExpression columnName, ILogicalExpression constValue,
+ FunctionIdentifier fid) throws AlgebricksException {
+ ConstantExpression constExpr = (ConstantExpression) constValue;
+ if (constExpr.getValue().isNull() || constExpr.getValue().isMissing()) {
+ throw new RuntimeException("Unsupported literal type: " + constExpr.getValue());
+ }
+ AsterixConstantValue constantValue = (AsterixConstantValue) constExpr.getValue();
+ String fieldName = createColumnExpression(columnName);
+ switch (constantValue.getObject().getType().getTypeTag()) {
+ case STRING:
+ return createComparisionFunction(FilterApi.binaryColumn(fieldName),
+ Binary.fromString(((AString) constantValue.getObject()).getStringValue()), fid);
+ case TINYINT:
+ return createComparisionFunction(FilterApi.intColumn(fieldName),
+ (int) ((AInt8) constantValue.getObject()).getByteValue(), fid);
+ case SMALLINT:
+ return createComparisionFunction(FilterApi.intColumn(fieldName),
+ (int) ((AInt16) constantValue.getObject()).getShortValue(), fid);
+ case INTEGER:
+ return createComparisionFunction(FilterApi.intColumn(fieldName),
+ ((AInt32) constantValue.getObject()).getIntegerValue(), fid);
+ case BOOLEAN:
+ if (!fid.equals(AlgebricksBuiltinFunctions.EQ)) {
+ throw new RuntimeException("Unsupported comparison function: " + fid);
+ }
+ return FilterApi.eq(FilterApi.booleanColumn(fieldName), constantValue.isTrue());
+ case BIGINT:
+ return createComparisionFunction(FilterApi.longColumn(fieldName),
+ ((AInt64) constantValue.getObject()).getLongValue(), fid);
+ case DOUBLE:
+ return createComparisionFunction(FilterApi.doubleColumn(fieldName),
+ ((ADouble) constantValue.getObject()).getDoubleValue(), fid);
+ case DATE:
+ return createComparisionFunction(FilterApi.intColumn(fieldName),
+ ((ADate) constantValue.getObject()).getChrononTimeInDays(), fid);
+ case DATETIME:
+ Long millis = ((ADateTime) constantValue.getObject()).getChrononTime();
+ return createComparisionFunction(FilterApi.longColumn(fieldName),
+ TimeUnit.MILLISECONDS.toMicros(millis), fid);
+ default:
+ throw new RuntimeException("Unsupported literal type: " + constantValue.getObject().getType());
+ }
+ }
+
+ @Override
+ protected IScalarEvaluatorFactory createValueAccessor(ILogicalExpression expression) {
+ return null;
+ }
+
+ private FilterPredicate createFilterExpression(ILogicalExpression expr) throws AlgebricksException {
+ if (expr == null || expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ throw new RuntimeException("Unsupported expression: " + expr);
+ }
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+ IFunctionDescriptor fd = resolveFunction(funcExpr);
+ FunctionIdentifier fid = fd.getIdentifier();
+ if (funcExpr.getArguments().size() != 2
+ && !(fid.equals(AlgebricksBuiltinFunctions.AND) || fid.equals(AlgebricksBuiltinFunctions.OR))) {
+ throw new RuntimeException("Unsupported function: " + funcExpr);
+ }
+ List<Mutable<ILogicalExpression>> args = funcExpr.getArguments();
+ if (fid.equals(AlgebricksBuiltinFunctions.AND) || fid.equals(AlgebricksBuiltinFunctions.OR)) {
+ return createAndOrPredicate(fid, args, 0);
+ } else {
+ return createComparisonExpression(args.get(0).getValue(), args.get(1).getValue(), fid);
+ }
+ }
+
+ private <T extends Comparable<T>, C extends Operators.Column<T> & Operators.SupportsLtGt> FilterPredicate createComparisionFunction(
+ C column, T value, FunctionIdentifier fid) {
+ if (fid.equals(AlgebricksBuiltinFunctions.EQ)) {
+ return FilterApi.eq(column, value);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.GE)) {
+ return FilterApi.gtEq(column, value);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.GT)) {
+ return FilterApi.gt(column, value);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.LE)) {
+ return FilterApi.ltEq(column, value);
+ } else if (fid.equals(AlgebricksBuiltinFunctions.LT)) {
+ return FilterApi.lt(column, value);
+ } else {
+ throw new RuntimeException("Unsupported function: " + fid);
+ }
+ }
+
+ protected String createColumnExpression(ILogicalExpression expression) {
+ ARecordType path = filterPaths.get(expression);
+ if (path.getFieldNames().length != 1) {
+ throw new RuntimeException("Unsupported column expression: " + expression);
+ } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) {
+ // The field could be a nested field
+ List<String> fieldList = new ArrayList<>();
+ fieldList = createPathExpression(path, fieldList);
+ return String.join(".", fieldList);
+ } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) {
+ return path.getFieldNames()[0];
+ } else {
+ throw new RuntimeException("Unsupported column expression: " + expression);
+ }
+ }
+
+ private List<String> createPathExpression(ARecordType path, List<String> fieldList) {
+ if (path.getFieldNames().length != 1) {
+ throw new RuntimeException("Error creating column expression");
+ } else {
+ fieldList.add(path.getFieldNames()[0]);
+ }
+ if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) {
+ return createPathExpression((ARecordType) path.getFieldTypes()[0], fieldList);
+ } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) {
+ return fieldList;
+ } else {
+ throw new RuntimeException("Error creating column expression");
+ }
+ }
+
+ // Converts or(pred1, pred2, pred3) to or(pred1, or(pred2, pred3))
+ private FilterPredicate createAndOrPredicate(FunctionIdentifier function, List<Mutable<ILogicalExpression>> args,
+ int index) throws AlgebricksException {
+ if (index == args.size() - 2) {
+ if (function.equals(AlgebricksBuiltinFunctions.AND)) {
+ return FilterApi.and(createFilterExpression(args.get(0).getValue()),
+ createFilterExpression(args.get(1).getValue()));
+ } else {
+ return FilterApi.or(createFilterExpression(args.get(0).getValue()),
+ createFilterExpression(args.get(1).getValue()));
+ }
+ } else {
+ if (function.equals(AlgebricksBuiltinFunctions.AND)) {
+ return FilterApi.and(createFilterExpression(args.get(index).getValue()),
+ createAndOrPredicate(function, args, index + 1));
+ } else {
+ return FilterApi.or(createFilterExpression(args.get(index).getValue()),
+ createAndOrPredicate(function, args, index + 1));
+ }
+ }
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
index e0ea99a..f95483e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
@@ -43,7 +43,7 @@
protected final ARecordType projectedType;
protected final ILogicalExpression filterExpression;
protected final Map<String, FunctionCallInformation> functionCallInfoMap;
- private final boolean embedFilterValues;
+ protected final boolean embedFilterValues;
protected final Map<ILogicalExpression, ARecordType> filterPaths;
public ExternalDatasetProjectionFiltrationInfo(ARecordType projectedType,
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ParquetExternalDatasetProjectionFiltrationInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ParquetExternalDatasetProjectionFiltrationInfo.java
new file mode 100644
index 0000000..323d3ed
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ParquetExternalDatasetProjectionFiltrationInfo.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.projection;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksStringBuilderWriter;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+public class ParquetExternalDatasetProjectionFiltrationInfo extends ExternalDatasetProjectionFiltrationInfo {
+
+ private ILogicalExpression parquetRowGroupFilterExpression;
+
+ public ParquetExternalDatasetProjectionFiltrationInfo(ARecordType projectedType,
+ Map<String, FunctionCallInformation> sourceInformationMap, Map<ILogicalExpression, ARecordType> filterPaths,
+ ILogicalExpression filterExpression, ILogicalExpression parquetRowGroupFilterExpression,
+ boolean embedFilterValues) {
+ super(projectedType, sourceInformationMap, filterPaths, filterExpression, embedFilterValues);
+ this.parquetRowGroupFilterExpression = parquetRowGroupFilterExpression;
+ }
+
+ private ParquetExternalDatasetProjectionFiltrationInfo(ParquetExternalDatasetProjectionFiltrationInfo other) {
+ super(other.projectedType, other.functionCallInfoMap, other.filterPaths, other.filterExpression,
+ other.embedFilterValues);
+ this.parquetRowGroupFilterExpression = other.parquetRowGroupFilterExpression;
+ }
+
+ public ILogicalExpression getParquetRowGroupFilterExpression() {
+ return parquetRowGroupFilterExpression;
+ }
+
+ @Override
+ public void print(AlgebricksStringBuilderWriter writer) {
+ super.print(writer);
+ if (parquetRowGroupFilterExpression != null) {
+ writer.append(" row-group-filter on: ");
+ writer.append(parquetRowGroupFilterExpression.toString());
+ }
+ }
+
+ @Override
+ public void print(JsonGenerator generator) throws IOException {
+ super.print(generator);
+ if (parquetRowGroupFilterExpression != null) {
+ generator.writeStringField("row-group-filter-on", parquetRowGroupFilterExpression.toString());
+ }
+ }
+
+ @Override
+ public ParquetExternalDatasetProjectionFiltrationInfo createCopy() {
+ return new ParquetExternalDatasetProjectionFiltrationInfo(this);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ParquetExternalDatasetProjectionFiltrationInfo otherInfo = (ParquetExternalDatasetProjectionFiltrationInfo) o;
+ return super.equals(o)
+ && filterExpressionEquals(parquetRowGroupFilterExpression, otherInfo.parquetRowGroupFilterExpression);
+ }
+}