[ASTERIXDB-3237][RT] Add external filter rewrite pass
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
This patch adds the rewrite pass to push down
predicates that contain expressions that access
external datasets' computed fields.
Change-Id: If3f907191431e4d06e0ebf23e0db9f1f95595ca7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17704
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
index b8f66f7..3b298d0 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
@@ -27,11 +27,12 @@
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.optimizer.base.AsterixOptimizationContext;
import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownProcessorsExecutor;
import org.apache.asterix.optimizer.rules.pushdown.processor.ColumnFilterPushdownProcessor;
import org.apache.asterix.optimizer.rules.pushdown.processor.ColumnRangeFilterPushdownProcessor;
import org.apache.asterix.optimizer.rules.pushdown.processor.ColumnValueAccessPushdownProcessor;
+import org.apache.asterix.optimizer.rules.pushdown.processor.ExternalDatasetFilterPushdownProcessor;
import org.apache.asterix.optimizer.rules.pushdown.processor.InlineFilterExpressionsProcessor;
-import org.apache.asterix.optimizer.rules.pushdown.processor.PushdownProcessorsExecutor;
import org.apache.asterix.optimizer.rules.pushdown.visitor.PushdownOperatorVisitor;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -76,6 +77,7 @@
@Override
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
+ // TODO this should be revised after introducing the proper compiler flags
if (!context.getPhysicalOptimizationConfig().isExternalFieldPushdown() || !run) {
//The rule was fired, or value access pushdown is disabled
return false;
@@ -109,9 +111,11 @@
if (context.getPhysicalOptimizationConfig().isColumnFilterEnabled()) {
// Performs filter pushdowns
pushdownProcessorsExecutor.add(new ColumnFilterPushdownProcessor(pushdownContext, context));
- // Perform range-filter pushdowns
+ // Performs range-filter pushdowns
pushdownProcessorsExecutor.add(new ColumnRangeFilterPushdownProcessor(pushdownContext, context));
- // Inline AND/OR expression
+ // Performs prefix pushdowns
+ pushdownProcessorsExecutor.add(new ExternalDatasetFilterPushdownProcessor(pushdownContext, context));
+ // Inlines AND/OR expression (must be last to run)
pushdownProcessorsExecutor.add(new InlineFilterExpressionsProcessor(pushdownContext, context));
}
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/PushdownProcessorsExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
similarity index 97%
rename from asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/PushdownProcessorsExecutor.java
rename to asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
index 4106337..1530f55 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/PushdownProcessorsExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.optimizer.rules.pushdown.processor;
+package org.apache.asterix.optimizer.rules.pushdown;
import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
@@ -28,8 +28,8 @@
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.processor.IPushdownProcessor;
import org.apache.asterix.optimizer.rules.pushdown.visitor.ExpectedSchemaNodeToIATypeTranslatorVisitor;
import org.apache.asterix.runtime.projection.ColumnDatasetProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
index 4dbb06d..4b57617 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
@@ -69,7 +69,7 @@
* @param scanDefineDescriptor data-scan descriptor
* @return true to skip, false otherwise
*/
- protected abstract boolean skip(ScanDefineDescriptor scanDefineDescriptor);
+ protected abstract boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws AlgebricksException;
/**
* Prepare data-scan for a pushdown
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
index 00b2dc4..8043f32 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
@@ -73,7 +73,7 @@
}
@Override
- protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) {
+ protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws AlgebricksException {
Dataset dataset = scanDefineDescriptor.getDataset();
LogicalOperatorTag scanOpTag = scanDefineDescriptor.getOperator().getOperatorTag();
/*
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java
new file mode 100644
index 0000000..bb5c853
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.processor;
+
+import static org.apache.asterix.metadata.utils.PushdownUtil.ARRAY_FUNCTIONS;
+
+import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.external.util.ExternalDataPrefix;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.UseDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ExternalDatasetFilterPushdownProcessor extends ColumnFilterPushdownProcessor {
+ private ExternalDataPrefix prefix;
+
+ public ExternalDatasetFilterPushdownProcessor(PushdownContext pushdownContext, IOptimizationContext context) {
+ super(pushdownContext, context);
+ }
+
+ @Override
+ protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws AlgebricksException {
+ Dataset dataset = scanDefineDescriptor.getDataset();
+ LogicalOperatorTag scanOpTag = scanDefineDescriptor.getOperator().getOperatorTag();
+ if (dataset.getDatasetType() != DatasetConfig.DatasetType.EXTERNAL) {
+ return true;
+ }
+
+ ExternalDatasetDetails edd = (ExternalDatasetDetails) dataset.getDatasetDetails();
+ prefix = new ExternalDataPrefix(edd.getProperties());
+
+ return !prefix.hasComputedFields() || scanOpTag != LogicalOperatorTag.DATASOURCESCAN
+ || !DatasetUtil.isFilterPushdownSupported(dataset);
+ }
+
+ @Override
+ protected void preparePushdown(UseDescriptor useDescriptor) throws AlgebricksException {
+ super.preparePushdown(useDescriptor);
+ }
+
+ @Override
+ protected boolean isPushable(AbstractFunctionCallExpression expression) {
+ FunctionIdentifier fid = expression.getFunctionIdentifier();
+ return !ARRAY_FUNCTIONS.contains(fid) && super.isPushable(expression);
+ }
+
+ @Override
+ protected boolean handlePath(AbstractFunctionCallExpression expression) throws AlgebricksException {
+ IExpectedSchemaNode node = expression.accept(exprToNodeVisitor, null);
+ if (node == null || node.getType() != ExpectedSchemaNodeType.ANY) {
+ return false;
+ }
+
+ // The inferred path from the provided expression
+ ARecordType expressionPath = pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node);
+ if (prefix.getPaths().contains(expressionPath)) {
+ // The expression refer to a declared computed field. Add it to the filter paths
+ paths.put(expression, expressionPath);
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java
index 85da569..6e3556c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java
@@ -21,6 +21,7 @@
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.SerializerDeserializerUtil;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -29,18 +30,17 @@
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
-import org.apache.hyracks.util.string.UTF8StringUtil;
import org.apache.hyracks.util.string.UTF8StringWriter;
class ExternalFilterValueEvaluator implements IExternalFilterValueEvaluator {
private final ATypeTag typeTag;
private final ArrayBackedValueStorage value;
- private final UTF8StringWriter utf8StringWriter;
+ private final AStringSerializerDeserializer stringSerDer;
ExternalFilterValueEvaluator(ATypeTag typeTag) {
this.typeTag = typeTag;
value = new ArrayBackedValueStorage();
- utf8StringWriter = new UTF8StringWriter();
+ stringSerDer = new AStringSerializerDeserializer(new UTF8StringWriter(), null);
}
@Override
@@ -58,7 +58,7 @@
result.set(value);
}
- private void writeValue(ATypeTag typeTag, String stringValue) throws IOException {
+ private void writeValue(ATypeTag typeTag, String stringValue) throws HyracksDataException {
DataOutput output = value.getDataOutput();
SerializerDeserializerUtil.serializeTag(typeTag, output);
switch (typeTag) {
@@ -70,7 +70,7 @@
case DOUBLE:
DoubleSerializerDeserializer.write(Double.parseDouble(stringValue), output);
case STRING:
- UTF8StringUtil.writeUTF8(stringValue, output, utf8StringWriter);
+ stringSerDer.serialize(stringValue, output);
}
}
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AStringSerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AStringSerializerDeserializer.java
index 814b8cb..5ac4794 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AStringSerializerDeserializer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AStringSerializerDeserializer.java
@@ -38,13 +38,22 @@
private static final long serialVersionUID = 1L;
+ /**
+ * Using this singleton object may instantiate too many objects
+ *
+ * @deprecated use {{@link #AStringSerializerDeserializer(UTF8StringWriter, UTF8StringReader)}}
+ */
+ @Deprecated
public static final AStringSerializerDeserializer INSTANCE = new AStringSerializerDeserializer();
private final UTF8StringWriter utf8StringWriter;
private final UTF8StringReader utf8StringReader;
private AStringSerializerDeserializer() {
- this.utf8StringWriter = null;
- this.utf8StringReader = null;
+ this(null, null);
+ }
+
+ public AStringSerializerDeserializer(UTF8StringWriter utf8StringWriter) {
+ this(utf8StringWriter, null);
}
public AStringSerializerDeserializer(UTF8StringWriter utf8StringWriter, UTF8StringReader utf8StringReader) {
@@ -63,8 +72,12 @@
@Override
public void serialize(AString instance, DataOutput out) throws HyracksDataException {
+ serialize(instance.getStringValue(), out);
+ }
+
+ public void serialize(String value, DataOutput out) throws HyracksDataException {
try {
- UTF8StringUtil.writeUTF8(instance.getStringValue(), out, utf8StringWriter);
+ UTF8StringUtil.writeUTF8(value, out, utf8StringWriter);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
index 13fc6e2..a1d22ac 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
@@ -131,7 +131,7 @@
}
if (filterExpression != null) {
- generator.writeStringField("filter-on", filterExpression.toString());
+ generator.writeStringField("prefix-filter-on", filterExpression.toString());
}
}