[ASTERIXDB-3237][RT] Add external filter rewrite pass

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
This patch adds the rewrite pass to push down
predicates that contain expressions that access
external datasets' computed fields.

Change-Id: If3f907191431e4d06e0ebf23e0db9f1f95595ca7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17704
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
index b8f66f7..3b298d0 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
@@ -27,11 +27,12 @@
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.optimizer.base.AsterixOptimizationContext;
 import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownProcessorsExecutor;
 import org.apache.asterix.optimizer.rules.pushdown.processor.ColumnFilterPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.processor.ColumnRangeFilterPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.processor.ColumnValueAccessPushdownProcessor;
+import org.apache.asterix.optimizer.rules.pushdown.processor.ExternalDatasetFilterPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.processor.InlineFilterExpressionsProcessor;
-import org.apache.asterix.optimizer.rules.pushdown.processor.PushdownProcessorsExecutor;
 import org.apache.asterix.optimizer.rules.pushdown.visitor.PushdownOperatorVisitor;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -76,6 +77,7 @@
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
+        // TODO this should be revised after introducing the proper compiler flags
         if (!context.getPhysicalOptimizationConfig().isExternalFieldPushdown() || !run) {
             //The rule was fired, or value access pushdown is disabled
             return false;
@@ -109,9 +111,11 @@
         if (context.getPhysicalOptimizationConfig().isColumnFilterEnabled()) {
             // Performs filter pushdowns
             pushdownProcessorsExecutor.add(new ColumnFilterPushdownProcessor(pushdownContext, context));
-            // Perform range-filter pushdowns
+            // Performs range-filter pushdowns
             pushdownProcessorsExecutor.add(new ColumnRangeFilterPushdownProcessor(pushdownContext, context));
-            // Inline AND/OR expression
+            // Performs prefix pushdowns
+            pushdownProcessorsExecutor.add(new ExternalDatasetFilterPushdownProcessor(pushdownContext, context));
+            // Inlines AND/OR expression (must be last to run)
             pushdownProcessorsExecutor.add(new InlineFilterExpressionsProcessor(pushdownContext, context));
         }
     }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/PushdownProcessorsExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
similarity index 97%
rename from asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/PushdownProcessorsExecutor.java
rename to asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
index 4106337..1530f55 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/PushdownProcessorsExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.optimizer.rules.pushdown.processor;
+package org.apache.asterix.optimizer.rules.pushdown;
 
 import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 
@@ -28,8 +28,8 @@
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.processor.IPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.visitor.ExpectedSchemaNodeToIATypeTranslatorVisitor;
 import org.apache.asterix.runtime.projection.ColumnDatasetProjectionFiltrationInfo;
 import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
index 4dbb06d..4b57617 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
@@ -69,7 +69,7 @@
      * @param scanDefineDescriptor data-scan descriptor
      * @return true to skip, false otherwise
      */
-    protected abstract boolean skip(ScanDefineDescriptor scanDefineDescriptor);
+    protected abstract boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws AlgebricksException;
 
     /**
      * Prepare data-scan for a pushdown
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
index 00b2dc4..8043f32 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
@@ -73,7 +73,7 @@
     }
 
     @Override
-    protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) {
+    protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws AlgebricksException {
         Dataset dataset = scanDefineDescriptor.getDataset();
         LogicalOperatorTag scanOpTag = scanDefineDescriptor.getOperator().getOperatorTag();
         /*
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java
new file mode 100644
index 0000000..bb5c853
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.processor;
+
+import static org.apache.asterix.metadata.utils.PushdownUtil.ARRAY_FUNCTIONS;
+
+import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.external.util.ExternalDataPrefix;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.UseDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ExternalDatasetFilterPushdownProcessor extends ColumnFilterPushdownProcessor {
+    private ExternalDataPrefix prefix;
+
+    public ExternalDatasetFilterPushdownProcessor(PushdownContext pushdownContext, IOptimizationContext context) {
+        super(pushdownContext, context);
+    }
+
+    @Override
+    protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws AlgebricksException {
+        Dataset dataset = scanDefineDescriptor.getDataset();
+        LogicalOperatorTag scanOpTag = scanDefineDescriptor.getOperator().getOperatorTag();
+        if (dataset.getDatasetType() != DatasetConfig.DatasetType.EXTERNAL) {
+            return true;
+        }
+
+        ExternalDatasetDetails edd = (ExternalDatasetDetails) dataset.getDatasetDetails();
+        prefix = new ExternalDataPrefix(edd.getProperties());
+
+        return !prefix.hasComputedFields() || scanOpTag != LogicalOperatorTag.DATASOURCESCAN
+                || !DatasetUtil.isFilterPushdownSupported(dataset);
+    }
+
+    @Override
+    protected void preparePushdown(UseDescriptor useDescriptor) throws AlgebricksException {
+        super.preparePushdown(useDescriptor);
+    }
+
+    @Override
+    protected boolean isPushable(AbstractFunctionCallExpression expression) {
+        FunctionIdentifier fid = expression.getFunctionIdentifier();
+        return !ARRAY_FUNCTIONS.contains(fid) && super.isPushable(expression);
+    }
+
+    @Override
+    protected boolean handlePath(AbstractFunctionCallExpression expression) throws AlgebricksException {
+        IExpectedSchemaNode node = expression.accept(exprToNodeVisitor, null);
+        if (node == null || node.getType() != ExpectedSchemaNodeType.ANY) {
+            return false;
+        }
+
+        // The inferred path from the provided expression
+        ARecordType expressionPath = pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node);
+        if (prefix.getPaths().contains(expressionPath)) {
+            // The expression refer to a declared computed field. Add it to the filter paths
+            paths.put(expression, expressionPath);
+            return true;
+        }
+        return false;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java
index 85da569..6e3556c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java
@@ -21,6 +21,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.SerializerDeserializerUtil;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -29,18 +30,17 @@
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
-import org.apache.hyracks.util.string.UTF8StringUtil;
 import org.apache.hyracks.util.string.UTF8StringWriter;
 
 class ExternalFilterValueEvaluator implements IExternalFilterValueEvaluator {
     private final ATypeTag typeTag;
     private final ArrayBackedValueStorage value;
-    private final UTF8StringWriter utf8StringWriter;
+    private final AStringSerializerDeserializer stringSerDer;
 
     ExternalFilterValueEvaluator(ATypeTag typeTag) {
         this.typeTag = typeTag;
         value = new ArrayBackedValueStorage();
-        utf8StringWriter = new UTF8StringWriter();
+        stringSerDer = new AStringSerializerDeserializer(new UTF8StringWriter(), null);
     }
 
     @Override
@@ -58,7 +58,7 @@
         result.set(value);
     }
 
-    private void writeValue(ATypeTag typeTag, String stringValue) throws IOException {
+    private void writeValue(ATypeTag typeTag, String stringValue) throws HyracksDataException {
         DataOutput output = value.getDataOutput();
         SerializerDeserializerUtil.serializeTag(typeTag, output);
         switch (typeTag) {
@@ -70,7 +70,7 @@
             case DOUBLE:
                 DoubleSerializerDeserializer.write(Double.parseDouble(stringValue), output);
             case STRING:
-                UTF8StringUtil.writeUTF8(stringValue, output, utf8StringWriter);
+                stringSerDer.serialize(stringValue, output);
         }
     }
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AStringSerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AStringSerializerDeserializer.java
index 814b8cb..5ac4794 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AStringSerializerDeserializer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AStringSerializerDeserializer.java
@@ -38,13 +38,22 @@
 
     private static final long serialVersionUID = 1L;
 
+    /**
+     * Using this singleton object may instantiate too many objects
+     *
+     * @deprecated use {{@link #AStringSerializerDeserializer(UTF8StringWriter, UTF8StringReader)}}
+     */
+    @Deprecated
     public static final AStringSerializerDeserializer INSTANCE = new AStringSerializerDeserializer();
     private final UTF8StringWriter utf8StringWriter;
     private final UTF8StringReader utf8StringReader;
 
     private AStringSerializerDeserializer() {
-        this.utf8StringWriter = null;
-        this.utf8StringReader = null;
+        this(null, null);
+    }
+
+    public AStringSerializerDeserializer(UTF8StringWriter utf8StringWriter) {
+        this(utf8StringWriter, null);
     }
 
     public AStringSerializerDeserializer(UTF8StringWriter utf8StringWriter, UTF8StringReader utf8StringReader) {
@@ -63,8 +72,12 @@
 
     @Override
     public void serialize(AString instance, DataOutput out) throws HyracksDataException {
+        serialize(instance.getStringValue(), out);
+    }
+
+    public void serialize(String value, DataOutput out) throws HyracksDataException {
         try {
-            UTF8StringUtil.writeUTF8(instance.getStringValue(), out, utf8StringWriter);
+            UTF8StringUtil.writeUTF8(value, out, utf8StringWriter);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
index 13fc6e2..a1d22ac 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
@@ -131,7 +131,7 @@
         }
 
         if (filterExpression != null) {
-            generator.writeStringField("filter-on", filterExpression.toString());
+            generator.writeStringField("prefix-filter-on", filterExpression.toString());
         }
     }