[ASTERIXDB-3229][EXT] Part 5: Introduce external filter interfaces
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
A prep patch for dynamic prefixes
Change-Id: Icaa3958ac1af11ef5f63ba125b7ce5858b04112c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17696
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
index 65b415d..a8baf48 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
@@ -60,7 +60,7 @@
/**
* Creates an instance of IDatasourceAdapter.
*
- * @param ctx HyracksTaskContext
+ * @param ctx HyracksTaskContext
* @param partition partition number
* @return An instance of IDatasourceAdapter.
* @throws Exception
@@ -72,10 +72,12 @@
*
* @param serviceContext
* @param configuration
- * @param warningCollector warning collector
+ * @param warningCollector warning collector
+ * @param filterEvaluatorFactory
* @throws AlgebricksException
* @throws HyracksDataException
*/
void configure(ICCServiceContext serviceContext, Map<String, String> configuration,
- IWarningCollector warningCollector) throws HyracksDataException, AlgebricksException;
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws HyracksDataException, AlgebricksException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluator.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluator.java
new file mode 100644
index 0000000..22cd20a
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.external;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IExternalFilterEvaluator {
+ boolean isEmpty();
+
+ boolean isComputedFieldUsed(int index);
+
+ void setValue(int index, String stringValue) throws HyracksDataException;
+
+ boolean evaluate() throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluatorFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluatorFactory.java
new file mode 100644
index 0000000..38a38a6
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluatorFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.external;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public interface IExternalFilterEvaluatorFactory extends Serializable {
+ IExternalFilterEvaluator create(IServiceContext serviceContext, IWarningCollector warningCollector)
+ throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/NoOpExternalFilterEvaluator.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/NoOpExternalFilterEvaluator.java
new file mode 100644
index 0000000..78ebeb4
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/NoOpExternalFilterEvaluator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.external;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+class NoOpExternalFilterEvaluator implements IExternalFilterEvaluator {
+ static final IExternalFilterEvaluator INSTANCE = new NoOpExternalFilterEvaluator();
+
+ private NoOpExternalFilterEvaluator() {
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return true;
+ }
+
+ @Override
+ public boolean isComputedFieldUsed(int index) {
+ return false;
+ }
+
+ @Override
+ public void setValue(int index, String stringValue) throws HyracksDataException {
+ throw new IndexOutOfBoundsException("Number of paths is 0");
+ }
+
+ @Override
+ public boolean evaluate() throws HyracksDataException {
+ return true;
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/NoOpExternalFilterEvaluatorFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/NoOpExternalFilterEvaluatorFactory.java
new file mode 100644
index 0000000..4b5bebb
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/NoOpExternalFilterEvaluatorFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.external;
+
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public class NoOpExternalFilterEvaluatorFactory implements IExternalFilterEvaluatorFactory {
+ public static final IExternalFilterEvaluatorFactory INSTANCE = new NoOpExternalFilterEvaluatorFactory();
+ private static final long serialVersionUID = 6470035020297216949L;
+
+ private NoOpExternalFilterEvaluatorFactory() {
+ }
+
+ @Override
+ public IExternalFilterEvaluator create(IServiceContext serviceContext, IWarningCollector warningCollector)
+ throws HyracksDataException {
+ return NoOpExternalFilterEvaluator.INSTANCE;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/ExternalAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/ExternalAdapterFactory.java
index 45c4b12..9741bb1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/ExternalAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/ExternalAdapterFactory.java
@@ -24,6 +24,8 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.external.IDataSourceAdapter;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.common.external.NoOpExternalFilterEvaluatorFactory;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.library.ILibrary;
import org.apache.asterix.common.library.ILibraryManager;
@@ -66,7 +68,7 @@
@Override
public void configure(ICCServiceContext serviceContext, Map<String, String> configuration,
- IWarningCollector warningCollector) {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory) {
this.serviceContext = serviceContext;
this.configuration = configuration;
}
@@ -92,7 +94,8 @@
ITypedAdapterFactory adapterFactory = (ITypedAdapterFactory) cl.loadClass(className).newInstance();
adapterFactory.setOutputType(outputType);
adapterFactory.setMetaType(metaType);
- adapterFactory.configure(null, configuration, ctx.getWarningCollector());
+ adapterFactory.configure(null, configuration, ctx.getWarningCollector(),
+ NoOpExternalFilterEvaluatorFactory.INSTANCE);
return adapterFactory.createAdapter(ctx, partition);
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException | AlgebricksException e) {
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index fb08586..89ee37b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -25,6 +25,8 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.external.IDataSourceAdapter;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.common.external.NoOpExternalFilterEvaluatorFactory;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.external.api.IDataFlowController;
import org.apache.asterix.external.api.IDataParserFactory;
@@ -118,7 +120,8 @@
if (dataSourceFactory == null) {
dataSourceFactory = createExternalDataSourceFactory(configuration);
// create and configure parser factory
- dataSourceFactory.configure(serviceContext, configuration, warningCollector);
+ dataSourceFactory.configure(serviceContext, configuration, warningCollector,
+ NoOpExternalFilterEvaluatorFactory.INSTANCE);
}
if (dataParserFactory == null) {
// create and configure parser factory
@@ -131,12 +134,13 @@
@Override
public void configure(ICCServiceContext serviceContext, Map<String, String> configuration,
- IWarningCollector warningCollector) throws HyracksDataException, AlgebricksException {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws HyracksDataException, AlgebricksException {
this.configuration = configuration;
ICcApplicationContext appCtx = (ICcApplicationContext) serviceContext.getApplicationContext();
ExternalDataUtils.validateDataSourceParameters(configuration);
dataSourceFactory = createExternalDataSourceFactory(configuration);
- dataSourceFactory.configure(serviceContext, configuration, warningCollector);
+ dataSourceFactory.configure(serviceContext, configuration, warningCollector, filterEvaluatorFactory);
ExternalDataUtils.validateDataParserParameters(configuration);
dataParserFactory = createDataParserFactory(configuration);
dataParserFactory.setRecordType(recordType);
@@ -199,10 +203,8 @@
/**
* Use pre-configured datasource factory For function datasources
*
- * @param dataSourceFactory
- * the function datasource factory
- * @param dataParserFactory
- * the function data parser factory
+ * @param dataSourceFactory the function datasource factory
+ * @param dataParserFactory the function data parser factory
* @throws AlgebricksException
*/
public void configure(IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory)
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index e5c4b3f..d628bc7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
@@ -62,10 +63,11 @@
* submitted AQL statement and any additional pairs added by the compiler
*
* @param configuration
+ * @param filterEvaluatorFactory
* @throws AsterixException
*/
- void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
- throws AlgebricksException, HyracksDataException;
+ void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException;
/**
* returns the passed partition constraints if not null, otherwise returns round robin absolute partition
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index f22d128..d7093b9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -31,6 +31,7 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IRecordReader;
@@ -76,7 +77,6 @@
protected static Object initLock = new Object();
protected Map<String, String> configuration;
protected Class<?> recordClass;
- protected boolean indexingOp = false;
private JobConf conf;
private InputSplit[] inputSplits;
private String nodeName;
@@ -84,7 +84,8 @@
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
- IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AlgebricksException, HyracksDataException {
JobConf hdfsConf = createHdfsConf(serviceCtx, configuration);
configureHdfsConf(hdfsConf, configuration);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
index 7313b31..ff6b03e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -26,6 +26,7 @@
import java.util.regex.Matcher;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -62,8 +63,8 @@
}
@Override
- public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
- throws AlgebricksException {
+ public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException {
this.configuration = configuration;
this.partitionConstraint =
((ICcApplicationContext) ctx.getApplicationContext()).getClusterStateManager().getClusterLocations();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 3043f7a..4fc63c6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -25,6 +25,7 @@
import java.util.PriorityQueue;
import java.util.function.Supplier;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -51,9 +52,9 @@
}
@Override
- public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
- throws AlgebricksException {
- super.configure(ctx, configuration, warningCollector);
+ public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException {
+ super.configure(ctx, configuration, warningCollector, filterEvaluatorFactory);
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
index 1dd0c8b..6a21c79 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
import org.apache.asterix.external.provider.StreamRecordReaderProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -59,13 +60,13 @@
}
@Override
- public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
- throws AlgebricksException, HyracksDataException {
+ public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException {
this.configuration = configuration;
// Stream factory
streamFactory = new AwsS3InputStreamFactory();
- streamFactory.configure(ctx, configuration, warningCollector);
+ streamFactory.configure(ctx, configuration, warningCollector, filterEvaluatorFactory);
// record reader
recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 93f1e69..66312bb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -28,6 +28,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -53,7 +54,8 @@
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
- IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AlgebricksException, HyracksDataException {
//Get path
String path = configuration.containsKey(ExternalDataConstants.KEY_PATH)
? configuration.get(ExternalDataConstants.KEY_PATH) : buildPathURIs(configuration, warningCollector);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
index 55c0521..b18b655 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
@@ -27,6 +27,7 @@
import java.util.PriorityQueue;
import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -52,9 +53,9 @@
}
@Override
- public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
- throws AlgebricksException {
- super.configure(ctx, configuration, warningCollector);
+ public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException {
+ super.configure(ctx, configuration, warningCollector, filterEvaluatorFactory);
IApplicationContext appCtx = (IApplicationContext) ctx.getApplicationContext();
// Ensure the validity of include/exclude
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobReaderFactory.java
index 525ee63..0f4d6ba 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobReaderFactory.java
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
import org.apache.asterix.external.provider.StreamRecordReaderProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -59,13 +60,13 @@
}
@Override
- public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
- throws AlgebricksException, HyracksDataException {
+ public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException {
this.configuration = configuration;
// Stream factory
streamFactory = new AzureBlobInputStreamFactory();
- streamFactory.configure(ctx, configuration, warningCollector);
+ streamFactory.configure(ctx, configuration, warningCollector, filterEvaluatorFactory);
// record reader
recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
index 929cb6e..35c3648 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
@@ -27,6 +27,7 @@
import java.util.PriorityQueue;
import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -52,9 +53,9 @@
}
@Override
- public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
- throws AlgebricksException {
- super.configure(ctx, configuration, warningCollector);
+ public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException {
+ super.configure(ctx, configuration, warningCollector, filterEvaluatorFactory);
IApplicationContext appCtx = (IApplicationContext) ctx.getApplicationContext();
// Ensure the validity of include/exclude
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java
index 594bacf..6f4685c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
import org.apache.asterix.external.provider.StreamRecordReaderProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -59,13 +60,13 @@
}
@Override
- public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
- throws AlgebricksException, HyracksDataException {
+ public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException {
this.configuration = configuration;
// Stream factory
streamFactory = new AzureDataLakeInputStreamFactory();
- streamFactory.configure(ctx, configuration, warningCollector);
+ streamFactory.configure(ctx, configuration, warningCollector, filterEvaluatorFactory);
// record reader
recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index e08013c..927e74e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -50,7 +51,8 @@
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
- IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AlgebricksException, HyracksDataException {
IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
BlobServiceClient blobServiceClient = buildAzureBlobClient(appCtx, configuration);
//Get endpoint
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
index c98fc8b..3ef9c56 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -50,7 +51,8 @@
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
- IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AlgebricksException, HyracksDataException {
IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
DataLakeServiceClient dataLakeServiceClient = buildAzureDatalakeClient(appCtx, configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
index 278c1ad..433fecd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.PriorityQueue;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -47,9 +48,9 @@
}
@Override
- public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
- throws AlgebricksException {
- super.configure(ctx, configuration, warningCollector);
+ public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException {
+ super.configure(ctx, configuration, warningCollector, filterEvaluatorFactory);
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
@@ -65,15 +66,15 @@
/**
* To efficiently utilize the parallelism, work load will be distributed amongst the partitions based on the file
* size.
- *
+ * <p>
* Example:
* File1 1mb, File2 300kb, File3 300kb, File4 300kb
- *
+ * <p>
* Distribution:
* Partition1: [File1]
* Partition2: [File2, File3, File4]
*
- * @param items items
+ * @param items items
* @param partitionsCount Partitions count
*/
private void distributeWorkLoad(List<Blob> items, int partitionsCount) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSReaderFactory.java
index ca42892..981a29d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSReaderFactory.java
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
import org.apache.asterix.external.provider.StreamRecordReaderProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -59,13 +60,13 @@
}
@Override
- public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
- throws AlgebricksException, HyracksDataException {
+ public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException {
this.configuration = configuration;
// Stream factory
streamFactory = new GCSInputStreamFactory();
- streamFactory.configure(ctx, configuration, warningCollector);
+ streamFactory.configure(ctx, configuration, warningCollector, filterEvaluatorFactory);
// record reader
recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
index 2887415..1de944b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
@@ -24,6 +24,7 @@
import java.util.Set;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -45,7 +46,8 @@
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
- IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AlgebricksException, HyracksDataException {
// get path
String path = buildPathURIs(configuration, warningCollector);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
index 5954d74..b5ade31 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
@@ -23,6 +23,7 @@
import java.util.Map;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -87,8 +88,8 @@
}
@Override
- public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
- throws AlgebricksException {
+ public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException {
this.configurations = configuration;
// necessary configs
addrValue = getConfigurationValue(KEY_CONFIGURATION_ADDRESSES, true);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
index 1a5d2a2..0026f07 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
@@ -26,6 +26,7 @@
import java.util.Map;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -62,7 +63,7 @@
@Override
public void configure(IServiceContext serviceContext, Map<String, String> configuration,
- IWarningCollector warningCollector) {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory) {
this.serviceContext = serviceContext;
String url = configuration.get(ExternalDataConstants.KEY_RSS_URL);
if (url == null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index c0a1b38..d9bbd01 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -26,6 +26,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -83,10 +84,11 @@
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
- IWarningCollector warningCollector) throws HyracksDataException, AlgebricksException {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws HyracksDataException, AlgebricksException {
this.configuration = configuration;
configureInputStreamFactory(configuration);
- streamFactory.configure(serviceCtx, configuration, warningCollector);
+ streamFactory.configure(serviceCtx, configuration, warningCollector, filterEvaluatorFactory);
recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 52d8c03..ef55d4a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -26,6 +26,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -80,7 +81,8 @@
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
- IWarningCollector warningCollector) throws AsterixException {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AsterixException {
try {
Class.forName("twitter4j.Twitter");
} catch (ClassNotFoundException e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
index cde0266..4f07a78 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
@@ -29,6 +29,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.INodeResolver;
@@ -87,7 +88,8 @@
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
- IWarningCollector warningCollector) throws AsterixException {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AsterixException {
this.configuration = configuration;
String[] splits = configuration.get(ExternalDataConstants.KEY_PATH).split(",");
if (inputFileSplits == null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
index 5d2b2a6..ceabbfc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IInputStreamFactory;
@@ -57,7 +58,8 @@
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
- IWarningCollector warningCollector) throws AsterixException {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AsterixException {
try {
this.serviceCtx = serviceCtx;
this.sockets = new ArrayList<>();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
index d628062..c8da459 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
@@ -26,6 +26,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.input.stream.SocketServerInputStream;
@@ -45,7 +46,8 @@
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
- IWarningCollector warningCollector) throws CompilationException {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws CompilationException {
try {
sockets = FeedUtils.extractHostsPorts(configuration.get(ExternalDataConstants.KEY_MODE), serviceCtx,
configuration.get(ExternalDataConstants.KEY_SOCKETS));
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
index ab7fa77..7a807a4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
@@ -24,6 +24,7 @@
import java.util.Map;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream;
@@ -86,7 +87,7 @@
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
- IWarningCollector warningCollector) {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory) {
this.serviceCtx = serviceCtx;
this.configuration = configuration;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index 27ae55c..c70cff4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -24,6 +24,7 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.NoOpExternalFilterEvaluatorFactory;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.library.ILibrary;
import org.apache.asterix.common.library.ILibraryManager;
@@ -55,14 +56,20 @@
private static final Logger LOGGER = LogManager.getLogger();
- /** The unique identifier of the feed that is being ingested. **/
+ /**
+ * The unique identifier of the feed that is being ingested.
+ **/
private final EntityId feedId;
private final FeedPolicyAccessor policyAccessor;
private final ARecordType adapterOutputType;
- /** The adaptor factory that is used to create an instance of the feed adaptor **/
+ /**
+ * The adaptor factory that is used to create an instance of the feed adaptor
+ **/
private ITypedAdapterFactory adaptorFactory;
- /** The library that contains the adapter in use. **/
+ /**
+ * The library that contains the adapter in use.
+ **/
private DataverseName adaptorLibraryDataverse;
private String adaptorLibraryName;
/**
@@ -70,7 +77,9 @@
* This value is used only in the case of external adapters.
**/
private String adaptorFactoryClassName;
- /** The configuration parameters associated with the adapter. **/
+ /**
+ * The configuration parameters associated with the adapter.
+ **/
private Map<String, String> adaptorConfiguration;
public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, ITypedAdapterFactory adapterFactory,
@@ -120,7 +129,8 @@
try {
adapterFactory = (ITypedAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance());
adapterFactory.setOutputType(adapterOutputType);
- adapterFactory.configure(null, adaptorConfiguration, ctx.getWarningCollector());
+ adapterFactory.configure(null, adaptorConfiguration, ctx.getWarningCollector(),
+ NoOpExternalFilterEvaluatorFactory.INSTANCE);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
index 4b0148e..3f7ae0e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -21,6 +21,7 @@
import java.util.Map;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.om.types.ARecordType;
@@ -40,7 +41,8 @@
// get adapter factory. this method has the side effect of modifying the configuration as necessary
public static ITypedAdapterFactory getAdapterFactory(ICCServiceContext serviceCtx, String adapterName,
Map<String, String> configuration, ARecordType itemType, ARecordType metaType,
- IWarningCollector warningCollector) throws HyracksDataException, AlgebricksException {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws HyracksDataException, AlgebricksException {
ExternalDataUtils.defaultConfiguration(configuration);
ExternalDataUtils.prepare(adapterName, configuration);
ICcApplicationContext context = (ICcApplicationContext) serviceCtx.getApplicationContext();
@@ -48,7 +50,7 @@
(ITypedAdapterFactory) context.getAdapterFactoryService().createAdapterFactory();
adapterFactory.setOutputType(itemType);
adapterFactory.setMetaType(metaType);
- adapterFactory.configure(serviceCtx, configuration, warningCollector);
+ adapterFactory.configure(serviceCtx, configuration, warningCollector, filterEvaluatorFactory);
return adapterFactory;
}
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
index c65e00d..4f36915 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
@@ -24,6 +24,7 @@
import java.util.Map;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -50,7 +51,7 @@
@Override
public void configure(IServiceContext serviceCtx, final Map<String, String> configuration,
- IWarningCollector warningCollector) {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory) {
this.serviceCtx = serviceCtx;
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
index 49ca23e..a21aeb6 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
@@ -24,6 +24,7 @@
import java.util.Map;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -60,7 +61,7 @@
@Override
public void configure(IServiceContext serviceCtx, final Map<String, String> configuration,
- IWarningCollector warningCollector) {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory) {
this.serviceCtx = serviceCtx;
if (configuration.containsKey("num-of-records")) {
numOfRecords = Integer.parseInt(configuration.get("num-of-records"));
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index f36e35b..651b190 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -25,6 +25,7 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.external.IDataSourceAdapter;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.dataflow.TupleForwarder;
import org.apache.asterix.external.parser.ADMDataParser;
@@ -106,7 +107,7 @@
@Override
public void configure(ICCServiceContext serviceContext, Map<String, String> configuration,
- IWarningCollector warningCollector) {
+ IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory) {
this.configuration = configuration;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index 9d4d508..25eab7f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -28,6 +28,7 @@
import java.util.Map;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.external.NoOpExternalFilterEvaluatorFactory;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -139,7 +140,8 @@
properties = addSubPath(externalDataSource.getProperties(), properties);
properties.put(KEY_EXTERNAL_SCAN_BUFFER_SIZE, String.valueOf(externalScanBufferSize));
ITypedAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(externalDataset,
- edd.getAdapter(), properties, (ARecordType) itemType, null, context.getWarningCollector());
+ edd.getAdapter(), properties, (ARecordType) itemType, context.getWarningCollector(),
+ NoOpExternalFilterEvaluatorFactory.INSTANCE);
return metadataProvider.getExternalDatasetScanRuntime(jobSpec, itemType, adapterFactory,
tupleFilterFactory, outputLimit);
case INTERNAL:
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java
index 5503cb0..22bfb31 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.metadata.api.IDatasourceFunction;
@@ -52,8 +53,8 @@
}
@Override
- public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
- throws AlgebricksException, HyracksDataException {
+ public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException {
// No Op
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
index ef95511..222ce81 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
@@ -26,6 +26,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.NoOpExternalFilterEvaluatorFactory;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.metadata.entities.Dataset;
@@ -144,7 +145,8 @@
LoadableDataSource alds = (LoadableDataSource) dataSource;
ARecordType itemType = (ARecordType) alds.getLoadedType();
ITypedAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(alds.getTargetDataset(),
- alds.getAdapter(), alds.getAdapterProperties(), itemType, null, context.getWarningCollector());
+ alds.getAdapter(), alds.getAdapterProperties(), itemType, context.getWarningCollector(),
+ NoOpExternalFilterEvaluatorFactory.INSTANCE);
RecordDescriptor rDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
return metadataProvider.getLoadableDatasetScanRuntime(jobSpec, adapterFactory, rDesc);
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index c5b8ea3..e15063b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -43,6 +43,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.external.IDataSourceAdapter;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.metadata.LockList;
@@ -899,13 +900,13 @@
}
protected ITypedAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
- Map<String, String> configuration, ARecordType itemType, ARecordType metaType,
- IWarningCollector warningCollector) throws AlgebricksException {
+ Map<String, String> configuration, ARecordType itemType, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException {
try {
configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE,
dataset.getDataverseName().getCanonicalForm());
return AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(), adapterName,
- configuration, itemType, metaType, warningCollector);
+ configuration, itemType, null, warningCollector, filterEvaluatorFactory);
} catch (Exception e) {
throw new AlgebricksException("Unable to create adapter", e);
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 0ee9516..e9944ee 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -32,6 +32,7 @@
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.common.external.IDataSourceAdapter.AdapterType;
+import org.apache.asterix.common.external.NoOpExternalFilterEvaluatorFactory;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.adapter.factory.ExternalAdapterFactory;
@@ -147,7 +148,8 @@
}
adapterFactory.setOutputType(adapterOutputType);
adapterFactory.setMetaType(metaType);
- adapterFactory.configure(appCtx.getServiceContext(), configuration, warningCollector);
+ adapterFactory.configure(appCtx.getServiceContext(), configuration, warningCollector,
+ NoOpExternalFilterEvaluatorFactory.INSTANCE);
if (metaType == null && configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME)) {
metaType = getOutputType(feed, configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME));
if (metaType == null) {
@@ -227,10 +229,12 @@
}
adapterFactory.setOutputType(adapterOutputType);
adapterFactory.setMetaType(metaType);
- adapterFactory.configure(appCtx.getServiceContext(), configuration, NoOpWarningCollector.INSTANCE);
+ adapterFactory.configure(appCtx.getServiceContext(), configuration, NoOpWarningCollector.INSTANCE,
+ NoOpExternalFilterEvaluatorFactory.INSTANCE);
} else {
adapterFactory = AdapterFactoryProvider.getAdapterFactory(appCtx.getServiceContext(), adapterName,
- configuration, adapterOutputType, metaType, NoOpWarningCollector.INSTANCE);
+ configuration, adapterOutputType, metaType, NoOpWarningCollector.INSTANCE,
+ NoOpExternalFilterEvaluatorFactory.INSTANCE);
adapterType = IDataSourceAdapter.AdapterType.INTERNAL;
}
if (metaType == null) {