[ASTERIXDB-3255][RT] Prepare for embedding values in parquet
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Preparing for embedding computed fields' values in Parquet.
This patch introduces IExternalDataRuntimeContext, which
could have different implementations when
creating parsers, readers, and/or input-streams.
Change-Id: Idde191ab6d99c48022bb65bec648e83e249e4b75
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17767
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/PrefixComputedFieldsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/PrefixComputedFieldsTest.java
index efa4c79..b8cc7fa 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/PrefixComputedFieldsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/PrefixComputedFieldsTest.java
@@ -24,17 +24,21 @@
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataPrefix;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.junit.Test;
import junit.framework.TestCase;
public class PrefixComputedFieldsTest extends TestCase {
+ private final Map<String, String> configuration = new HashMap<>();
@Test
public void test() throws Exception {
@@ -49,7 +53,7 @@
assertTrue(prefix.getIndexToComputedFieldsMap().isEmpty());
String prefix1 = "";
- prefix = new ExternalDataPrefix(prefix1);
+ prefix = createExternalDataPrefix(prefix1);
assertEquals("", prefix.getOriginal());
assertEquals("", prefix.getRoot());
assertFalse(prefix.isEndsWithSlash());
@@ -60,7 +64,7 @@
assertTrue(prefix.getIndexToComputedFieldsMap().isEmpty());
String prefix2 = "hotel";
- prefix = new ExternalDataPrefix(prefix2);
+ prefix = createExternalDataPrefix(prefix2);
assertEquals("hotel", prefix.getOriginal());
assertEquals("hotel", prefix.getRoot());
assertFalse(prefix.isEndsWithSlash());
@@ -71,7 +75,7 @@
assertTrue(prefix.getIndexToComputedFieldsMap().isEmpty());
String prefix3 = "hotel/{hotel-id:inT}/";
- prefix = new ExternalDataPrefix(prefix3);
+ prefix = createExternalDataPrefix(prefix3);
assertEquals("hotel/{hotel-id:inT}/", prefix.getOriginal());
assertEquals("hotel/", prefix.getRoot());
assertTrue(prefix.isEndsWithSlash());
@@ -82,7 +86,7 @@
assertEquals("(.+)", prefix.getIndexToComputedFieldsMap().get(1).getExpression());
String prefix4 = "hotel/{hotel-id:int}-{hotel-name:sTRing}";
- prefix = new ExternalDataPrefix(prefix4);
+ prefix = createExternalDataPrefix(prefix4);
assertEquals("hotel/{hotel-id:int}-{hotel-name:sTRing}", prefix.getOriginal());
assertEquals("hotel", prefix.getRoot());
assertFalse(prefix.isEndsWithSlash());
@@ -93,7 +97,7 @@
assertEquals("(.+)-(.+)", prefix.getIndexToComputedFieldsMap().get(1).getExpression());
String prefix5 = "hotel/something/{hotel-id:int}-{hotel-name:sTRing}/review/{year:int}-{month:int}-{day:int}/";
- prefix = new ExternalDataPrefix(prefix5);
+ prefix = createExternalDataPrefix(prefix5);
assertEquals("hotel/something/{hotel-id:int}-{hotel-name:sTRing}/review/{year:int}-{month:int}-{day:int}/",
prefix.getOriginal());
assertEquals("hotel/something/", prefix.getRoot());
@@ -107,7 +111,7 @@
assertEquals("(.+)-(.+)-(.+)", prefix.getIndexToComputedFieldsMap().get(4).getExpression());
String prefix6 = "hotel/something/{hotel-id:int}-{hotel-name:sTRing}/review/{year:int}/{month:int}/{day:int}";
- prefix = new ExternalDataPrefix(prefix6);
+ prefix = createExternalDataPrefix(prefix6);
assertEquals("hotel/something/{hotel-id:int}-{hotel-name:sTRing}/review/{year:int}/{month:int}/{day:int}",
prefix.getOriginal());
assertEquals("hotel/something", prefix.getRoot());
@@ -123,7 +127,7 @@
assertEquals("(.+)", prefix.getIndexToComputedFieldsMap().get(6).getExpression());
String prefix7 = "hotel/{hotel.details.id:int}-{hotel-name:sTRing}";
- prefix = new ExternalDataPrefix(prefix7);
+ prefix = createExternalDataPrefix(prefix7);
assertEquals("hotel/{hotel.details.id:int}-{hotel-name:sTRing}", prefix.getOriginal());
assertEquals("hotel", prefix.getRoot());
assertFalse(prefix.isEndsWithSlash());
@@ -134,7 +138,7 @@
String prefix8 =
"hotel/hotel-{hotel-id:int}-hotel-{hotel-name:sTRing}/review/year-{year:int}/{month:int}-month/day-{day:int}-day";
- prefix = new ExternalDataPrefix(prefix8);
+ prefix = createExternalDataPrefix(prefix8);
assertEquals(
"hotel/hotel-{hotel-id:int}-hotel-{hotel-name:sTRing}/review/year-{year:int}/{month:int}-month/day-{day:int}-day",
prefix.getOriginal());
@@ -175,4 +179,10 @@
System.out.println("\n");
}
}
+
+ private ExternalDataPrefix createExternalDataPrefix(String path) throws AlgebricksException {
+ configuration.clear();
+ configuration.put(ExternalDataConstants.KEY_PATH, path);
+ return new ExternalDataPrefix(configuration);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEmbeddedValueInformation.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEmbeddedValueInformation.java
index 9ee37d8..39225cd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEmbeddedValueInformation.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEmbeddedValueInformation.java
@@ -22,7 +22,7 @@
String[] getEmbeddedFieldNames();
- boolean IsMissingEmbeddedValues();
+ boolean isMissingEmbeddedValues();
boolean isMissing(String fieldName);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
index 8eb4475..43e822e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
@@ -21,7 +21,6 @@
import java.io.InputStream;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -30,7 +29,6 @@
protected AbstractFeedDataFlowController controller;
protected IFeedLogManager logManager;
protected IStreamNotificationHandler notificationHandler;
- protected IExternalFilterValueEmbedder valueEmbedder;
public abstract boolean stop() throws Exception;
@@ -50,10 +48,6 @@
this.notificationHandler = notificationHandler;
}
- public void setValueEmbedder(IExternalFilterValueEmbedder valueEmbedder) {
- this.valueEmbedder = valueEmbedder;
- }
-
public String getStreamName() {
return "";
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
index c1d6797..5dbc383 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
@@ -24,7 +24,6 @@
import org.apache.asterix.builders.OrderedListBuilder;
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.builders.UnorderedListBuilder;
-import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.om.base.AMutableOrderedList;
import org.apache.asterix.om.base.AMutableRecord;
@@ -39,9 +38,6 @@
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
public interface IDataParser {
- default void setValueEmbedder(IExternalFilterValueEmbedder valueEmbedder) {
- // NoOp
- }
/*
* The following two static methods are expensive. right now, they are used by RSSFeeds and
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataRuntimeContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataRuntimeContext.java
new file mode 100644
index 0000000..00a9919
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataRuntimeContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * A runtime context, where different {@link IRecordReader} and {@link IDataParser} can have different implementations
+ * of this interface. This context can be created by {@link IExternalDataSourceFactory}.
+ */
+public interface IExternalDataRuntimeContext {
+ IHyracksTaskContext getTaskContext();
+
+ int getPartition();
+
+ IExternalFilterValueEmbedder getValueEmbedder();
+
+ Supplier<String> getDatasourceNameSupplier();
+
+ LongSupplier getLineNumberSupplier();
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index 058fe89..6be46ac 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -28,11 +28,11 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
-import org.apache.asterix.external.input.filter.NoOpFilterValueEmbedder;
-import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.asterix.external.provider.context.DefaultExternalRuntimeDataContext;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -71,8 +71,9 @@
void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException;
- default IExternalFilterValueEmbedder createFilterValueEmbedder(IWarningCollector warningCollector) {
- return NoOpFilterValueEmbedder.INSTANCE;
+ default IExternalDataRuntimeContext createExternalDataRuntimeContext(IHyracksTaskContext context, int partition)
+ throws HyracksDataException {
+ return new DefaultExternalRuntimeDataContext(context, partition);
}
/**
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamFactory.java
index a460e49..b430cf0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamFactory.java
@@ -18,15 +18,14 @@
*/
package org.apache.asterix.external.api;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IInputStreamFactory extends IExternalDataSourceFactory {
- public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException;
+ AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException;
@Override
- public default DataSourceType getDataSourceType() {
+ default DataSourceType getDataSourceType() {
return DataSourceType.STREAM;
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
index f544ca0..e4ab8bd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
@@ -19,18 +19,8 @@
package org.apache.asterix.external.api;
import java.io.IOException;
-import java.util.function.LongSupplier;
@FunctionalInterface
public interface IRecordConverter<I, O> {
-
- public O convert(IRawRecord<? extends I> input) throws IOException;
-
- /**
- * Configures the converter with information suppliers from the {@link IRecordReader} data source.
- *
- * @param lineNumber line number supplier
- */
- default void configure(LongSupplier lineNumber) {
- }
+ O convert(IRawRecord<? extends I> input) throws IOException;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
index 31e1764..2cb4c4b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
@@ -19,8 +19,6 @@
package org.apache.asterix.external.api;
import java.io.DataOutput;
-import java.util.function.LongSupplier;
-import java.util.function.Supplier;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -37,14 +35,5 @@
* @return true if the record was parsed successfully and written to out. False, otherwise.
* @throws HyracksDataException HyracksDataException
*/
- public boolean parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-
- /**
- * Configures the parser with information suppliers from the {@link IRecordReader} data source.
- *
- * @param dataSourceName data source name supplier
- * @param lineNumber line number supplier
- */
- default void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) {
- }
+ boolean parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java
index 2ddbbcd..b9032a0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java
@@ -19,16 +19,15 @@
package org.apache.asterix.external.api;
import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IRecordDataParserFactory<T> extends IDataParserFactory {
- public IRecordDataParser<T> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException;
+ IRecordDataParser<T> createRecordParser(IExternalDataRuntimeContext context) throws HyracksDataException;
- public Class<?> getRecordClass();
+ Class<?> getRecordClass();
@Override
- public default DataSourceType getDataSourceType() {
+ default DataSourceType getDataSourceType() {
return DataSourceType.RECORDS;
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index b7d4c85..b80e46c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -25,7 +25,6 @@
import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -75,10 +74,6 @@
*/
public void setFeedLogManager(IFeedLogManager feedLogManager) throws HyracksDataException;
- default void setValueEmbedder(IExternalFilterValueEmbedder valueEmbedder) {
- // NoOp
- }
-
/**
* gives the record reader a chance to recover from IO errors during feed intake
*/
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
index 8849508..64be02e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
@@ -22,12 +22,11 @@
import java.util.Set;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IRecordReaderFactory<T> extends IExternalDataSourceFactory {
- IRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition) throws HyracksDataException;
+ IRecordReader<? extends T> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException;
Class<?> getRecordClass();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
index ad9acc6..7b56319 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
@@ -18,11 +18,9 @@
*/
package org.apache.asterix.external.api;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IStreamDataParserFactory extends IDataParserFactory {
- public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
- throws HyracksDataException;
+ IStreamDataParser createInputStreamParser(IExternalDataRuntimeContext context) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index d7093b9..46afdbe 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -33,14 +33,17 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
import org.apache.asterix.external.input.record.reader.hdfs.parquet.ParquetFileRecordReader;
import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
import org.apache.asterix.external.input.stream.HDFSInputStream;
import org.apache.asterix.external.provider.StreamRecordReaderProvider;
+import org.apache.asterix.external.provider.context.ExternalStreamRuntimeDataContext;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.HDFSUtils;
@@ -81,19 +84,21 @@
private InputSplit[] inputSplits;
private String nodeName;
private Class recordReaderClazz;
+ private IExternalFilterEvaluatorFactory filterEvaluatorFactory;
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
throws AlgebricksException, HyracksDataException {
- JobConf hdfsConf = createHdfsConf(serviceCtx, configuration);
+ JobConf hdfsConf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
configureHdfsConf(hdfsConf, configuration);
}
- protected JobConf createHdfsConf(IServiceContext serviceCtx, Map<String, String> configuration)
- throws HyracksDataException {
+ protected JobConf prepareHDFSConf(IServiceContext serviceCtx, Map<String, String> configuration,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws HyracksDataException {
this.serviceCtx = serviceCtx;
this.configuration = configuration;
+ this.filterEvaluatorFactory = filterEvaluatorFactory;
init((ICCServiceContext) serviceCtx);
return HDFSUtils.configureHDFSJobConf(configuration);
}
@@ -210,9 +215,10 @@
* 2. Indexing Stream Record Reader: When we transform the input into a byte stream and perform indexing.
* 3. HDFS Record Reader: When we simply pass the Writable object as it is to the parser.
*/
+ @SuppressWarnings("unchecked")
@Override
- public IRecordReader<? extends Object> createRecordReader(IHyracksTaskContext ctx, int partition)
- throws HyracksDataException {
+ public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException {
+ IHyracksTaskContext ctx = context.getTaskContext();
try {
if (recordReaderClazz != null) {
StreamRecordReader streamReader = (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
@@ -231,8 +237,7 @@
*/
readerConf = confFactory.getConf();
}
- return createRecordReader(configuration, read, inputSplits, readSchedule, nodeName, readerConf,
- ctx.getWarningCollector());
+ return createRecordReader(configuration, read, inputSplits, readSchedule, nodeName, readerConf, context);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
@@ -248,11 +253,19 @@
return recordReaderNames;
}
- private static IRecordReader<? extends Object> createRecordReader(Map<String, String> configuration, boolean[] read,
+ @Override
+ public IExternalDataRuntimeContext createExternalDataRuntimeContext(IHyracksTaskContext context, int partition) {
+ IExternalFilterValueEmbedder valueEmbedder =
+ filterEvaluatorFactory.createValueEmbedder(context.getWarningCollector());
+ return new ExternalStreamRuntimeDataContext(context, partition, valueEmbedder);
+ }
+
+ private static IRecordReader<?> createRecordReader(Map<String, String> configuration, boolean[] read,
InputSplit[] inputSplits, String[] readSchedule, String nodeName, JobConf conf,
- IWarningCollector warningCollector) {
+ IExternalDataRuntimeContext context) {
if (configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT.trim())
.equals(ExternalDataConstants.INPUT_FORMAT_PARQUET)) {
+ IWarningCollector warningCollector = context.getTaskContext().getWarningCollector();
return new ParquetFileRecordReader<>(read, inputSplits, readSchedule, nodeName, conf, warningCollector);
} else {
return new HDFSRecordReader<>(read, inputSplits, readSchedule, nodeName, conf);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpFilterValueEmbedder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpFilterValueEmbedder.java
index fe15387..e4a9131 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpFilterValueEmbedder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpFilterValueEmbedder.java
@@ -45,17 +45,12 @@
}
@Override
- public boolean shouldEmbed(IValueReference fieldName, ATypeTag typeTag) {
- return false;
- }
-
- @Override
public IValueReference getEmbeddedValue() {
throw new IllegalAccessError("Cannot embed a value to " + this.getClass().getName());
}
@Override
- public boolean IsMissingEmbeddedValues() {
+ public boolean isMissingEmbeddedValues() {
return false;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/EmbeddedValueBuilder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/EmbeddedValueBuilder.java
index ca6921b..5ffd7d9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/EmbeddedValueBuilder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/EmbeddedValueBuilder.java
@@ -60,7 +60,7 @@
}
void build(String path) throws HyracksDataException {
- List<String> values = prefix.getValues(path);
+ List<String> values = prefix.getValues(prefix.removeProtocolContainerPair(path));
build(allPaths, values);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/ExternalFilterValueEmbedder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/ExternalFilterValueEmbedder.java
index 358cb64..2e9864b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/ExternalFilterValueEmbedder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/ExternalFilterValueEmbedder.java
@@ -34,7 +34,7 @@
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.data.std.api.IValueReference;
-public final class ExternalFilterValueEmbedder implements IExternalFilterValueEmbedder {
+public class ExternalFilterValueEmbedder implements IExternalFilterValueEmbedder {
private final ARecordType allPaths;
private final IWarningCollector warningCollector;
private final Map<IAType, BitSet> setValues;
@@ -79,11 +79,6 @@
}
@Override
- public boolean shouldEmbed(IValueReference fieldName, ATypeTag typeTag) {
- throw new IllegalAccessError("Should not be invoked");
- }
-
- @Override
public IValueReference getEmbeddedValue() {
IValueReference value = builder.getValue(currentType);
if (currentType.getTypeTag() != ATypeTag.OBJECT) {
@@ -96,7 +91,7 @@
}
@Override
- public boolean IsMissingEmbeddedValues() {
+ public boolean isMissingEmbeddedValues() {
if (currentType.getTypeTag() == ATypeTag.MISSING) {
return false;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/IExternalFilterValueEmbedder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/IExternalFilterValueEmbedder.java
index e4623c5..91bf32c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/IExternalFilterValueEmbedder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/IExternalFilterValueEmbedder.java
@@ -30,8 +30,6 @@
boolean shouldEmbed(String fieldName, ATypeTag typeTag);
- boolean shouldEmbed(IValueReference fieldName, ATypeTag typeTag);
-
void enterObject();
void exitObject();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
index 8b930aa..ad59f75 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
@@ -23,6 +23,7 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.input.record.CharArrayRecord;
import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
@@ -39,17 +40,19 @@
private final int valueIndex;
private final RecordWithMetadataAndPK<char[]> recordWithMetadata;
private final CharArrayRecord record;
- private LongSupplier lineNumber = ExternalDataConstants.NO_LINES;
+ private final LongSupplier lineNumber;
public CSVToRecordWithMetadataAndPKConverter(final int valueIndex, final char delimiter, final ARecordType metaType,
final ARecordType recordType, final int[] keyIndicator, final int[] keyIndexes, final IAType[] keyTypes,
- IWarningCollector warningCollector) {
+ IExternalDataRuntimeContext context) {
+ IWarningCollector warningCollector = context.getTaskContext().getWarningCollector();
this.cursor = new FieldCursorForDelimitedDataParser(null, delimiter, ExternalDataConstants.QUOTE,
warningCollector, ExternalDataConstants.EMPTY_STRING);
this.record = new CharArrayRecord();
this.valueIndex = valueIndex;
this.recordWithMetadata = new RecordWithMetadataAndPK<>(record, metaType.getFieldTypes(), recordType,
keyIndicator, keyIndexes, keyTypes);
+ lineNumber = context.getLineNumberSupplier();
}
@Override
@@ -79,9 +82,4 @@
}
return recordWithMetadata;
}
-
- @Override
- public void configure(LongSupplier lineNumber) {
- this.lineNumber = lineNumber == null ? ExternalDataConstants.NO_LINES : lineNumber;
- }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVWithRecordConverterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVWithRecordConverterFactory.java
index a9e5bc7..1961500 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVWithRecordConverterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVWithRecordConverterFactory.java
@@ -22,12 +22,12 @@
import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRecordConverter;
import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
public class CSVWithRecordConverterFactory implements IRecordConverterFactory<char[], RecordWithMetadataAndPK<char[]>> {
@@ -41,9 +41,10 @@
private IAType[] keyTypes;
@Override
- public IRecordConverter<char[], RecordWithMetadataAndPK<char[]>> createConverter(IHyracksTaskContext ctx) {
+ public IRecordConverter<char[], RecordWithMetadataAndPK<char[]>> createConverter(
+ IExternalDataRuntimeContext context) {
return new CSVToRecordWithMetadataAndPKConverter(recordIndex, delimiter, metaType, recordType, keyIndicators,
- keyIndexes, keyTypes, ctx.getWarningCollector());
+ keyIndexes, keyTypes, context);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
index b228b94..eb4c8e1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
@@ -21,10 +21,10 @@
import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRecordConverter;
import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import com.couchbase.client.core.message.dcp.DCPRequest;
@@ -51,7 +51,8 @@
}
@Override
- public IRecordConverter<DCPRequest, RecordWithMetadataAndPK<char[]>> createConverter(IHyracksTaskContext ctx) {
+ public IRecordConverter<DCPRequest, RecordWithMetadataAndPK<char[]>> createConverter(
+ IExternalDataRuntimeContext context) {
return new DCPMessageToRecordConverter();
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java
index 875a331..96ab8ba 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java
@@ -22,22 +22,22 @@
import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRecordConverter;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
public interface IRecordConverterFactory<I, O> extends Serializable {
- public IRecordConverter<I, O> createConverter(IHyracksTaskContext ctx);
+ IRecordConverter<I, O> createConverter(IExternalDataRuntimeContext context);
- public void configure(Map<String, String> configuration) throws AsterixException;
+ void configure(Map<String, String> configuration) throws AsterixException;
- public Class<?> getInputClass();
+ Class<?> getInputClass();
- public Class<?> getOutputClass();
+ Class<?> getOutputClass();
- public void setRecordType(ARecordType recordType);
+ void setRecordType(ARecordType recordType);
- public void setMetaType(ARecordType metaType);
+ void setMetaType(ARecordType metaType);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
index 8ef19ad..8b43cb7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.stream.AbstractMultipleInputStream;
import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.logging.log4j.LogManager;
@@ -36,11 +37,14 @@
// File fields
protected final List<String> filePaths;
+ private final IExternalFilterValueEmbedder valueEmbedder;
protected int nextFileIndex = 0;
- public AbstractExternalInputStream(Map<String, String> configuration, List<String> filePaths) {
+ public AbstractExternalInputStream(Map<String, String> configuration, List<String> filePaths,
+ IExternalFilterValueEmbedder valueEmbedder) {
this.configuration = configuration;
this.filePaths = filePaths;
+ this.valueEmbedder = valueEmbedder;
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
index 34e3d2d..a1d5d0a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -28,8 +28,10 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.asterix.external.provider.context.ExternalStreamRuntimeDataContext;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
@@ -52,7 +54,7 @@
}
@Override
- public abstract AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition)
+ public abstract AsterixInputStream createInputStream(IExternalDataRuntimeContext context)
throws HyracksDataException;
@Override
@@ -74,8 +76,10 @@
}
@Override
- public IExternalFilterValueEmbedder createFilterValueEmbedder(IWarningCollector warningCollector) {
- return filterEvaluatorFactory.createValueEmbedder(warningCollector);
+ public IExternalDataRuntimeContext createExternalDataRuntimeContext(IHyracksTaskContext context, int partition) {
+ IExternalFilterValueEmbedder valueEmbedder =
+ filterEvaluatorFactory.createValueEmbedder(context.getWarningCollector());
+ return new ExternalStreamRuntimeDataContext(context, partition, valueEmbedder);
}
public static class PartitionWorkLoadBasedOnSize implements Serializable {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index bbcf9cd..56feb8a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -29,6 +29,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.aws.s3.S3Utils;
@@ -50,8 +51,9 @@
private final S3Client s3Client;
private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service
- public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
- super(configuration, filePaths);
+ public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths,
+ IExternalFilterValueEmbedder valueEmbedder) throws HyracksDataException {
+ super(configuration, filePaths, valueEmbedder);
this.s3Client = buildAwsS3Client(configuration);
this.bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 5b4e314..36d21d1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -26,13 +26,14 @@
import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -43,8 +44,11 @@
private static final long serialVersionUID = 1L;
@Override
- public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
- return new AwsS3InputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+ public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException {
+ IExternalFilterValueEmbedder valueEmbedder = context.getValueEmbedder();
+ int partition = context.getPartition();
+ return new AwsS3InputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(),
+ valueEmbedder);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 98b5b06..f335538 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -83,7 +83,7 @@
//Configure Hadoop S3 input splits
try {
- JobConf conf = createHdfsConf(serviceCtx, configuration);
+ JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
int numberOfPartitions = getPartitionConstraint().getLocations().length;
configureAwsS3HdfsJobConf(conf, configuration, numberOfPartitions);
configureHdfsConf(conf, configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
index bbfece2..52d16c1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -46,9 +47,9 @@
private final BlobServiceClient client;
private final String container;
- public AzureBlobInputStream(IApplicationContext appCtx, Map<String, String> configuration, List<String> filePaths)
- throws HyracksDataException {
- super(configuration, filePaths);
+ public AzureBlobInputStream(IApplicationContext appCtx, Map<String, String> configuration, List<String> filePaths,
+ IExternalFilterValueEmbedder valueEmbedder) throws HyracksDataException {
+ super(configuration, filePaths, valueEmbedder);
this.client = buildAzureClient(appCtx, configuration);
this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
index ada68ac..f31e823 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
@@ -29,12 +29,13 @@
import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -45,11 +46,13 @@
private static final long serialVersionUID = 1L;
@Override
- public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
- IApplicationContext appCtx =
- (IApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
+ public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException {
+ IApplicationContext appCtx = (IApplicationContext) context.getTaskContext().getJobletContext()
+ .getServiceContext().getApplicationContext();
+ IExternalFilterValueEmbedder valueEmbedder = context.getValueEmbedder();
+ int partition = context.getPartition();
return new AzureBlobInputStream(appCtx, configuration,
- partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+ partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(), valueEmbedder);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
index 7a95222..ba47d57 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -47,8 +48,8 @@
private final String container;
public AzureDataLakeInputStream(IApplicationContext appCtx, Map<String, String> configuration,
- List<String> filePaths) throws HyracksDataException {
- super(configuration, filePaths);
+ List<String> filePaths, IExternalFilterValueEmbedder valueEmbedder) throws HyracksDataException {
+ super(configuration, filePaths, valueEmbedder);
this.client = buildAzureClient(appCtx, configuration);
this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
index 89a1d80..41b041c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
@@ -29,11 +29,12 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -45,11 +46,13 @@
private static final long serialVersionUID = 1L;
@Override
- public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
- IApplicationContext appCtx =
- (IApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
+ public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException {
+ IApplicationContext appCtx = (IApplicationContext) context.getTaskContext().getJobletContext()
+ .getServiceContext().getApplicationContext();
+ IExternalFilterValueEmbedder valueEmbedder = context.getValueEmbedder();
+ int partition = context.getPartition();
return new AzureDataLakeInputStream(appCtx, configuration,
- partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+ partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(), valueEmbedder);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index 25ac181..530ce74 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -79,7 +79,7 @@
putAzureBlobConfToHadoopConf(configuration, path);
// configure Hadoop Azure input splits
- JobConf conf = createHdfsConf(serviceCtx, configuration);
+ JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
configureAzureHdfsJobConf(conf, configuration, endPoint);
configureHdfsConf(conf, configuration);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
index 3ef9c56..4dedb08 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
@@ -66,7 +66,7 @@
putAzureDataLakeConfToHadoopConf(configuration, path);
//Configure Hadoop Azure input splits
- JobConf conf = createHdfsConf(serviceCtx, configuration);
+ JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
configureAzureHdfsJobConf(conf, configuration, endPoint);
configureHdfsConf(conf, configuration);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
index 4657bd0..ebc9907 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.google.gcs.GCSUtils;
@@ -49,8 +50,9 @@
private final String container;
private static final int MAX_ATTEMPTS = 5; // We try a total of 5 times in case of retryable errors
- public GCSInputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
- super(configuration, filePaths);
+ public GCSInputStream(Map<String, String> configuration, List<String> filePaths,
+ IExternalFilterValueEmbedder valueEmbedder) throws HyracksDataException {
+ super(configuration, filePaths, valueEmbedder);
this.client = buildClient(configuration);
this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
index 74f6428..b6ad3cd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
@@ -26,13 +26,14 @@
import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.google.gcs.GCSUtils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -43,8 +44,11 @@
private static final long serialVersionUID = 1L;
@Override
- public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
- return new GCSInputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+ public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException {
+ IExternalFilterValueEmbedder valueEmbedder = context.getValueEmbedder();
+ int partition = context.getPartition();
+ return new GCSInputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(),
+ valueEmbedder);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
index 50a1039..17cad3e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
@@ -69,7 +69,7 @@
putGCSConfToHadoopConf(configuration, path);
// configure hadoop input splits
- JobConf conf = createHdfsConf(serviceCtx, configuration);
+ JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
int numberOfPartitions = getPartitionConstraint().getLocations().length;
GCSUtils.configureHdfsJobConf(conf, configuration, numberOfPartitions);
configureHdfsConf(conf, configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
index b5ade31..4736675 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
@@ -24,6 +24,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -32,7 +33,6 @@
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.http.server.HttpServerConfigBuilder;
@@ -54,10 +54,10 @@
private List<Pair<String, Integer>> serverAddrs;
@Override
- public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+ public IRecordReader<? extends char[]> createRecordReader(IExternalDataRuntimeContext context)
throws HyracksDataException {
try {
- return new HttpServerRecordReader(serverAddrs.get(partition).getRight(), entryPoint, queueSize,
+ return new HttpServerRecordReader(serverAddrs.get(context.getPartition()).getRight(), entryPoint, queueSize,
HttpServerConfigBuilder.createDefault());
} catch (Exception e) {
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
index 0026f07..21ab53e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -34,7 +35,6 @@
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -86,10 +86,10 @@
}
@Override
- public IRecordReader<? extends SyndEntry> createRecordReader(IHyracksTaskContext ctx, int partition)
+ public IRecordReader<? extends SyndEntry> createRecordReader(IExternalDataRuntimeContext context)
throws HyracksDataException {
try {
- return new RSSRecordReader(urls.get(partition));
+ return new RSSRecordReader(urls.get(context.getPartition()));
} catch (MalformedURLException e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
index 9d12325..937f3fe 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
@@ -31,7 +31,6 @@
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IStreamNotificationHandler;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.record.CharArrayRecord;
import org.apache.asterix.external.input.stream.AsterixInputStreamReader;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -49,11 +48,9 @@
protected IFeedLogManager feedLogManager;
private Supplier<String> dataSourceName = EMPTY_STRING;
private Supplier<String> previousDataSourceName = EMPTY_STRING;
- private AsterixInputStream inputStream;
public void configure(AsterixInputStream inputStream, Map<String, String> config) {
int bufferSize = ExternalDataUtils.getOrDefaultBufferSize(config);
- this.inputStream = inputStream;
this.reader = new AsterixInputStreamReader(inputStream, bufferSize);
record = new CharArrayRecord();
inputBuffer = new char[bufferSize];
@@ -123,11 +120,6 @@
return dataSourceName;
}
- @Override
- public void setValueEmbedder(IExternalFilterValueEmbedder valueEmbedder) {
- inputStream.setValueEmbedder(valueEmbedder);
- }
-
String getPreviousStreamName() {
return previousDataSourceName.get();
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index c9567c5..d8168ce 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -35,6 +36,7 @@
import org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory;
import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory;
import org.apache.asterix.external.provider.StreamRecordReaderProvider;
+import org.apache.asterix.external.provider.context.ExternalReaderRuntimeDataContext;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -82,22 +84,19 @@
}
@Override
- public final IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+ public final IRecordReader<? extends char[]> createRecordReader(IExternalDataRuntimeContext context)
throws HyracksDataException {
- try {
- StreamRecordReader streamRecordReader =
- (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
- streamRecordReader.configure(ctx, streamFactory.createInputStream(ctx, partition), configuration);
- return streamRecordReader;
- } catch (InstantiationException | IllegalAccessException | InvocationTargetException
- | NoSuchMethodException e) {
- throw HyracksDataException.create(e);
- }
+ StreamRecordReader reader = createReader(context);
+ ((ExternalReaderRuntimeDataContext) context).setReader(reader);
+ return reader;
}
@Override
- public final IExternalFilterValueEmbedder createFilterValueEmbedder(IWarningCollector warningCollector) {
- return filterEvaluatorFactory.createValueEmbedder(warningCollector);
+ public final IExternalDataRuntimeContext createExternalDataRuntimeContext(IHyracksTaskContext context,
+ int partition) throws HyracksDataException {
+ IExternalFilterValueEmbedder valueEmbedder =
+ filterEvaluatorFactory.createValueEmbedder(context.getWarningCollector());
+ return new ExternalReaderRuntimeDataContext(context, partition, valueEmbedder);
}
@Override
@@ -118,4 +117,18 @@
throw new CompilationException(ErrorCode.FEED_UNKNOWN_ADAPTER_NAME);
}
}
+
+ @SuppressWarnings("unchecked")
+ private StreamRecordReader createReader(IExternalDataRuntimeContext context) throws HyracksDataException {
+ try {
+ StreamRecordReader streamRecordReader =
+ (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
+ streamRecordReader.configure(context.getTaskContext(), streamFactory.createInputStream(context),
+ configuration);
+ return streamRecordReader;
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException
+ | NoSuchMethodException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index ef55d4a..2f87074 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -37,7 +38,6 @@
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.logging.log4j.LogManager;
@@ -127,7 +127,7 @@
}
@Override
- public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+ public IRecordReader<? extends char[]> createRecordReader(IExternalDataRuntimeContext context)
throws HyracksDataException {
IRecordReader<? extends char[]> recordReader;
switch (configuration.get(ExternalDataConstants.KEY_READER)) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
index 4f07a78..fd7907f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
@@ -31,6 +31,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.INodeResolver;
import org.apache.asterix.external.api.INodeResolverFactory;
@@ -42,7 +43,6 @@
import org.apache.asterix.runtime.utils.RuntimeUtils;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.io.UnmanagedFileSplit;
@@ -66,10 +66,10 @@
private transient FileSystemWatcher watcher;
@Override
- public synchronized AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition)
+ public synchronized AsterixInputStream createInputStream(IExternalDataRuntimeContext context)
throws HyracksDataException {
if (watcher == null) {
- String nodeName = ctx.getJobletContext().getServiceContext().getNodeId();
+ String nodeName = context.getTaskContext().getJobletContext().getServiceContext().getNodeId();
ArrayList<Path> inputResources = new ArrayList<>();
for (int i = 0; i < inputFileSplits.length; i++) {
if (inputFileSplits[i].getNodeName().equals(nodeName)) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
index ceabbfc..12c70bb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
@@ -29,6 +29,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.input.stream.SocketClientInputStream;
@@ -38,7 +39,6 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -84,9 +84,9 @@
}
@Override
- public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+ public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException {
try {
- return new SocketClientInputStream(sockets.get(partition));
+ return new SocketClientInputStream(sockets.get(context.getPartition()));
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
index c8da459..7106489 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
@@ -28,6 +28,7 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.input.stream.SocketServerInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -35,7 +36,6 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -59,10 +59,10 @@
}
@Override
- public synchronized AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition)
+ public synchronized AsterixInputStream createInputStream(IExternalDataRuntimeContext context)
throws HyracksDataException {
try {
- Pair<String, Integer> socket = sockets.get(partition);
+ Pair<String, Integer> socket = sockets.get(context.getPartition());
ServerSocket server;
server = new ServerSocket();
server.bind(new InetSocketAddress(socket.getRight()));
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
index 7a807a4..d486228 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
@@ -26,11 +26,11 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -93,9 +93,9 @@
}
@Override
- public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+ public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException {
try {
- return new TwitterFirehoseInputStream(configuration, partition);
+ return new TwitterFirehoseInputStream(configuration, context.getPartition());
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractJsonDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractJsonDataParser.java
index d9e374f..de9962c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractJsonDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractJsonDataParser.java
@@ -30,12 +30,11 @@
import org.apache.asterix.builders.IAsterixListBuilder;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.input.filter.NoOpFilterValueEmbedder;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.parser.jackson.ADMToken;
import org.apache.asterix.external.parser.jackson.GeometryCoParser;
import org.apache.asterix.external.parser.jackson.ParserContext;
-import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.base.ABoolean;
import org.apache.asterix.om.base.ANull;
import org.apache.asterix.om.base.AUnorderedList;
@@ -69,27 +68,29 @@
protected final JsonFactory jsonFactory;
protected final ARecordType rootType;
protected final GeometryCoParser geometryCoParser;
- protected Supplier<String> dataSourceName = ExternalDataConstants.EMPTY_STRING;
- protected LongSupplier lineNumber = ExternalDataConstants.NO_LINES;
+ protected final Supplier<String> dataSourceName;
+ protected final LongSupplier lineNumber;
+ protected final IExternalFilterValueEmbedder valueEmbedder;
protected JsonParser jsonParser;
- protected IExternalFilterValueEmbedder valueEmbedder;
-
/**
* Initialize JSONDataParser with GeometryCoParser
*
* @param recordType defined type.
* @param jsonFactory Jackson JSON parser factory.
*/
- public AbstractJsonDataParser(ARecordType recordType, JsonFactory jsonFactory) {
+ public AbstractJsonDataParser(ARecordType recordType, JsonFactory jsonFactory,
+ IExternalDataRuntimeContext context) {
// recordType currently cannot be null, however this is to guarantee for any future changes.
this.rootType = recordType != null ? recordType : RecordUtil.FULLY_OPEN_RECORD_TYPE;
this.jsonFactory = jsonFactory;
//GeometryCoParser to parse GeoJSON objects to AsterixDB internal spatial types.
geometryCoParser = new GeometryCoParser(jsonParser);
parserContext = new ParserContext();
- valueEmbedder = NoOpFilterValueEmbedder.INSTANCE;
+ dataSourceName = context.getDatasourceNameSupplier();
+ lineNumber = context.getLineNumberSupplier();
+ valueEmbedder = context.getValueEmbedder();
}
/*
@@ -245,7 +246,7 @@
checkOptionalConstraints(recordType, nullBitMap);
}
- if (valueEmbedder.IsMissingEmbeddedValues()) {
+ if (valueEmbedder.isMissingEmbeddedValues()) {
String[] embeddedFieldNames = valueEmbedder.getEmbeddedFieldNames();
for (int i = 0; i < embeddedFieldNames.length; i++) {
String embeddedFieldName = embeddedFieldNames[i];
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
index 2936d11..bc855b4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -34,17 +34,16 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IStreamDataParser;
-import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.base.AMutableString;
import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -65,20 +64,20 @@
private final ArrayBackedValueStorage fieldValueBuffer;
private final DataOutput fieldValueBufferOutput;
private final IValueParser[] valueParsers;
- private FieldCursorForDelimitedDataParser cursor;
- private Supplier<String> dataSourceName;
- private LongSupplier lineNumber;
+ private final Supplier<String> dataSourceName;
+ private final LongSupplier lineNumber;
private final byte[] fieldTypeTags;
private final int[] fldIds;
private final ArrayBackedValueStorage[] nameBuffers;
private final char[] nullChars;
+ private FieldCursorForDelimitedDataParser cursor;
- public DelimitedDataParser(IHyracksTaskContext ctx, IValueParserFactory[] valueParserFactories, char fieldDelimiter,
- char quote, boolean hasHeader, ARecordType recordType, boolean isStreamParser, String nullString)
- throws HyracksDataException {
- this.dataSourceName = ExternalDataConstants.EMPTY_STRING;
- this.lineNumber = ExternalDataConstants.NO_LINES;
- this.warnings = ctx.getWarningCollector();
+ public DelimitedDataParser(IExternalDataRuntimeContext context, IValueParserFactory[] valueParserFactories,
+ char fieldDelimiter, char quote, boolean hasHeader, ARecordType recordType, boolean isStreamParser,
+ String nullString) throws HyracksDataException {
+ this.dataSourceName = context.getDatasourceNameSupplier();
+ this.lineNumber = context.getLineNumberSupplier();
+ this.warnings = context.getTaskContext().getWarningCollector();
this.fieldDelimiter = fieldDelimiter;
this.quote = quote;
this.hasHeader = hasHeader;
@@ -245,13 +244,6 @@
return true;
}
- @Override
- public void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) {
- this.dataSourceName = dataSourceName == null ? ExternalDataConstants.EMPTY_STRING : dataSourceName;
- this.lineNumber = lineNumber == null ? ExternalDataConstants.NO_LINES : lineNumber;
-
- }
-
private String getDataSourceName() {
return dataSourceName.get();
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
index 20b466b..f79cee1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
@@ -23,17 +23,15 @@
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
-import java.util.function.LongSupplier;
-import java.util.function.Supplier;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IStreamDataParser;
-import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.parser.jackson.ADMToken;
-import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.provider.context.NoOpExternalRuntimeDataContext;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -48,13 +46,23 @@
public class JSONDataParser extends AbstractJsonDataParser implements IStreamDataParser, IRecordDataParser<char[]> {
/**
- * Initialize JSONDataParser
+ * This constructor is used by several query runtime evaluators and tests
*
* @param recordType defined type.
* @param jsonFactory Jackson JSON parser factory.
*/
public JSONDataParser(ARecordType recordType, JsonFactory jsonFactory) {
- super(recordType, jsonFactory);
+ this(recordType, jsonFactory, NoOpExternalRuntimeDataContext.INSTANCE);
+ }
+
+ /**
+ * Initialize JSONDataParser
+ *
+ * @param recordType defined type.
+ * @param jsonFactory Jackson JSON parser factory.
+ */
+ public JSONDataParser(ARecordType recordType, JsonFactory jsonFactory, IExternalDataRuntimeContext context) {
+ super(recordType, jsonFactory, context);
}
@Override
@@ -62,11 +70,6 @@
setInput(jsonFactory.createParser(in));
}
- @Override
- public void setValueEmbedder(IExternalFilterValueEmbedder valueEmbedder) {
- this.valueEmbedder = valueEmbedder;
- }
-
public void setInputNode(JsonNode node) {
setInput(new TreeTraversingParser(node));
}
@@ -94,12 +97,6 @@
}
@Override
- public void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) {
- this.dataSourceName = dataSourceName == null ? ExternalDataConstants.EMPTY_STRING : dataSourceName;
- this.lineNumber = lineNumber == null ? ExternalDataConstants.NO_LINES : lineNumber;
- }
-
- @Override
public boolean parse(DataOutput out) throws HyracksDataException {
try {
if (nextToken() == ADMToken.EOF) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
index 820775c..d799f22 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
@@ -20,8 +20,6 @@
import java.io.DataOutput;
import java.io.IOException;
-import java.util.function.LongSupplier;
-import java.util.function.Supplier;
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.external.api.IDataParser;
@@ -113,10 +111,4 @@
public void appendLastParsedPrimaryKeyToTuple(ArrayTupleBuilder tb) throws HyracksDataException {
rwm.appendPrimaryKeyToTuple(tb);
}
-
- @Override
- public void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) {
- this.recordParser.configure(dataSourceName, lineNumber);
- this.converter.configure(lineNumber);
- }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/evaluators/StringJsonParseEval.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/evaluators/StringJsonParseEval.java
index 7589e33..23c3591 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/evaluators/StringJsonParseEval.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/evaluators/StringJsonParseEval.java
@@ -26,6 +26,7 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.parser.JSONDataParser;
import org.apache.asterix.external.parser.factory.JSONDataParserFactory;
+import org.apache.asterix.external.provider.context.DefaultExternalRuntimeDataContext;
import org.apache.asterix.om.exceptions.ExceptionUtil;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
@@ -57,7 +58,8 @@
throws IOException {
this.ctx = ctx;
this.inputEval = inputEval;
- this.parser = (JSONDataParser) new JSONDataParserFactory().createInputStreamParser(ctx.getTaskContext(), 0);
+ this.parser = (JSONDataParser) new JSONDataParserFactory()
+ .createInputStreamParser(new DefaultExternalRuntimeDataContext(ctx.getTaskContext()));
this.sourceLocation = sourceLocation;
inputVal = new VoidPointable();
utf8Val = new UTF8StringPointable();
@@ -111,4 +113,4 @@
throw HyracksDataException.create(e);
}
}
-}
\ No newline at end of file
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
index 394fcb3..edfa83e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
@@ -22,14 +22,13 @@
import java.util.Collections;
import java.util.List;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IStreamDataParser;
import org.apache.asterix.external.parser.ADMDataParser;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
public class ADMDataParserFactory extends AbstractRecordStreamParserFactory<char[]> {
@@ -38,7 +37,7 @@
Collections.unmodifiableList(Arrays.asList("adm", "semi-structured"));
@Override
- public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) {
+ public IRecordDataParser<char[]> createRecordParser(IExternalDataRuntimeContext context) {
return createParser();
}
@@ -53,8 +52,7 @@
}
@Override
- public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
- throws HyracksDataException {
+ public IStreamDataParser createInputStreamParser(IExternalDataRuntimeContext context) {
return createParser();
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
index 09f9697..8b88a69 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
@@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.List;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IStreamDataParser;
@@ -29,7 +30,6 @@
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -41,17 +41,18 @@
ExternalDataConstants.FORMAT_DELIMITED_TEXT, ExternalDataConstants.FORMAT_TSV));
@Override
- public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
- return createParser(ctx);
+ public IRecordDataParser<char[]> createRecordParser(IExternalDataRuntimeContext context)
+ throws HyracksDataException {
+ return createParser(context);
}
- private DelimitedDataParser createParser(IHyracksTaskContext ctx) throws HyracksDataException {
+ private DelimitedDataParser createParser(IExternalDataRuntimeContext context) throws HyracksDataException {
IValueParserFactory[] valueParserFactories = ExternalDataUtils.getValueParserFactories(recordType);
char delimiter = ExternalDataUtils.validateGetDelimiter(configuration);
char quote = ExternalDataUtils.validateGetQuote(configuration, delimiter);
boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
String nullString = configuration.get(ExternalDataConstants.KEY_NULL_STR);
- return new DelimitedDataParser(ctx, valueParserFactories, delimiter, quote, hasHeader, recordType,
+ return new DelimitedDataParser(context, valueParserFactories, delimiter, quote, hasHeader, recordType,
ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.STREAM), nullString);
}
@@ -61,9 +62,8 @@
}
@Override
- public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
- throws HyracksDataException {
- return createParser(ctx);
+ public IStreamDataParser createInputStreamParser(IExternalDataRuntimeContext context) throws HyracksDataException {
+ return createParser(context);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
index f9d50d3..6889067 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
@@ -24,6 +24,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IStreamDataParser;
import org.apache.asterix.external.parser.JSONDataParser;
@@ -33,8 +34,6 @@
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.AUnionType;
import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
@@ -58,9 +57,8 @@
}
@Override
- public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
- throws HyracksDataException {
- return createParser();
+ public IStreamDataParser createInputStreamParser(IExternalDataRuntimeContext context) {
+ return createParser(context);
}
@Override
@@ -74,8 +72,8 @@
}
@Override
- public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
- return createParser();
+ public IRecordDataParser<char[]> createRecordParser(IExternalDataRuntimeContext context) {
+ return createParser(context);
}
@Override
@@ -83,8 +81,8 @@
return char[].class;
}
- private JSONDataParser createParser() throws HyracksDataException {
- return new JSONDataParser(recordType, jsonFactory);
+ private JSONDataParser createParser(IExternalDataRuntimeContext context) {
+ return new JSONDataParser(recordType, jsonFactory, context);
}
/*
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/NoOpDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/NoOpDataParserFactory.java
index f4fca17..82835bb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/NoOpDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/NoOpDataParserFactory.java
@@ -23,13 +23,13 @@
import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordDataParserFactory;
import org.apache.asterix.external.parser.NoOpDataParser;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
@@ -45,7 +45,7 @@
@Override
public void setRecordType(ARecordType recordType) throws AsterixException {
- //it always return open type
+ //it always returns open type
}
@Override
@@ -59,7 +59,8 @@
}
@Override
- public IRecordDataParser<IValueReference> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
+ public IRecordDataParser<IValueReference> createRecordParser(IExternalDataRuntimeContext context)
+ throws HyracksDataException {
return new NoOpDataParser();
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java
index a1d2298..13c2ec6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java
@@ -23,11 +23,11 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordDataParserFactory;
import org.apache.asterix.external.parser.RSSParser;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import com.rometools.rome.feed.synd.SyndEntry;
@@ -48,7 +48,7 @@
}
@Override
- public IRecordDataParser<SyndEntry> createRecordParser(IHyracksTaskContext ctx) {
+ public IRecordDataParser<SyndEntry> createRecordParser(IExternalDataRuntimeContext context) {
RSSParser dataParser = new RSSParser(recordType);
return dataParser;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
index 3ae9b82..f9093dc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
@@ -25,6 +25,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordDataParserFactory;
import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
@@ -36,7 +37,6 @@
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class RecordWithMetadataParserFactory<I, O> implements IRecordDataParserFactory<I> {
@@ -98,9 +98,9 @@
}
@Override
- public IRecordDataParser<I> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
- IRecordDataParser<O> recordParser = recordParserFactory.createRecordParser(ctx);
- return new RecordWithMetadataParser<I, O>(metaType, recordParser, converterFactory.createConverter(ctx));
+ public IRecordDataParser<I> createRecordParser(IExternalDataRuntimeContext context) throws HyracksDataException {
+ IRecordDataParser<O> recordParser = recordParserFactory.createRecordParser(context);
+ return new RecordWithMetadataParser<I, O>(metaType, recordParser, converterFactory.createConverter(context));
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
index e5b6d01..6083e21 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
@@ -23,12 +23,12 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordDataParserFactory;
import org.apache.asterix.external.parser.TweetParser;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
public class TweetParserFactory implements IRecordDataParserFactory<char[]> {
@@ -48,9 +48,8 @@
}
@Override
- public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) {
- TweetParser dataParser = new TweetParser(recordType);
- return dataParser;
+ public IRecordDataParser<char[]> createRecordParser(IExternalDataRuntimeContext context) {
+ return new TweetParser(recordType);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 83ec7e7..3817742 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -27,6 +27,7 @@
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IDataFlowController;
import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.IRecordDataParser;
@@ -44,7 +45,6 @@
import org.apache.asterix.external.dataflow.FeedWithMetaDataFlowController;
import org.apache.asterix.external.dataflow.RecordDataFlowController;
import org.apache.asterix.external.dataflow.StreamDataFlowController;
-import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.asterix.om.types.ARecordType;
@@ -62,19 +62,14 @@
int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory,
Map<String, String> configuration, boolean isFeed, IFeedLogManager feedLogManager)
throws HyracksDataException {
- IExternalFilterValueEmbedder valueEmbedder =
- dataSourceFactory.createFilterValueEmbedder(ctx.getWarningCollector());
+ IExternalDataRuntimeContext runtimeContext = dataSourceFactory.createExternalDataRuntimeContext(ctx, partition);
try {
switch (dataSourceFactory.getDataSourceType()) {
case RECORDS:
IRecordReaderFactory<?> recordReaderFactory = (IRecordReaderFactory<?>) dataSourceFactory;
- IRecordReader<?> recordReader = recordReaderFactory.createRecordReader(ctx, partition);
- recordReader.setValueEmbedder(valueEmbedder);
+ IRecordReader<?> recordReader = recordReaderFactory.createRecordReader(runtimeContext);
IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
- IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
- // TODO(ali): revisit to think about passing data source name via setter or via createRecordParser
- dataParser.configure(recordReader.getDataSourceName(), recordReader.getLineNumber());
- dataParser.setValueEmbedder(valueEmbedder);
+ IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(runtimeContext);
if (isFeed) {
boolean isChangeFeed = ExternalDataUtils.isChangeFeed(configuration);
boolean isRecordWithMeta = ExternalDataUtils.isRecordWithMeta(configuration);
@@ -99,13 +94,11 @@
}
case STREAM:
IInputStreamFactory streamFactory = (IInputStreamFactory) dataSourceFactory;
- AsterixInputStream stream = streamFactory.createInputStream(ctx, partition);
- stream.setValueEmbedder(valueEmbedder);
+ AsterixInputStream stream = streamFactory.createInputStream(runtimeContext);
IStreamDataParserFactory streamParserFactory = (IStreamDataParserFactory) dataParserFactory;
// TODO(ali): revisit to think about passing data source name to parser
- IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(ctx, partition);
+ IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(runtimeContext);
streamParser.setInputStream(stream);
- streamParser.setValueEmbedder(valueEmbedder);
if (isFeed) {
return new FeedStreamDataFlowController(ctx, feedLogManager, streamParser, stream);
} else {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/DefaultExternalRuntimeDataContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/DefaultExternalRuntimeDataContext.java
new file mode 100644
index 0000000..9fe2ff7
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/DefaultExternalRuntimeDataContext.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider.context;
+
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.input.filter.NoOpFilterValueEmbedder;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class DefaultExternalRuntimeDataContext implements IExternalDataRuntimeContext {
+ private final IHyracksTaskContext context;
+ private final int partition;
+
+ public DefaultExternalRuntimeDataContext(IHyracksTaskContext context) {
+ this(context, -1);
+ }
+
+ public DefaultExternalRuntimeDataContext(IHyracksTaskContext context, int partition) {
+ this.context = context;
+ this.partition = partition;
+ }
+
+ @Override
+ public final IHyracksTaskContext getTaskContext() {
+ return context;
+ }
+
+ @Override
+ public final int getPartition() {
+ return partition;
+ }
+
+ @Override
+ public IExternalFilterValueEmbedder getValueEmbedder() {
+ return NoOpFilterValueEmbedder.INSTANCE;
+ }
+
+ @Override
+ public Supplier<String> getDatasourceNameSupplier() {
+ return ExternalDataConstants.EMPTY_STRING;
+ }
+
+ @Override
+ public LongSupplier getLineNumberSupplier() {
+ return ExternalDataConstants.NO_LINES;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/ExternalReaderRuntimeDataContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/ExternalReaderRuntimeDataContext.java
new file mode 100644
index 0000000..587fc3e
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/ExternalReaderRuntimeDataContext.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider.context;
+
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class ExternalReaderRuntimeDataContext extends ExternalStreamRuntimeDataContext {
+ private final IExternalFilterValueEmbedder valueEmbedder;
+ private StreamRecordReader reader;
+
+ public ExternalReaderRuntimeDataContext(IHyracksTaskContext context, int partition,
+ IExternalFilterValueEmbedder valueEmbedder) {
+ super(context, partition, valueEmbedder);
+ this.valueEmbedder = valueEmbedder;
+ reader = null;
+ }
+
+ @Override
+ public Supplier<String> getDatasourceNameSupplier() {
+ if (reader == null) {
+ // Safeguard: cannot invoke this method unless a reader has been set
+ throw new NullPointerException("Reader has not been set");
+ }
+ return reader.getDataSourceName();
+ }
+
+ @Override
+ public LongSupplier getLineNumberSupplier() {
+ if (reader == null) {
+ // Safeguard: cannot invoke this method unless a reader has been set
+ throw new NullPointerException("Reader has not been set");
+ }
+ return reader.getLineNumber();
+ }
+
+ @Override
+ public IExternalFilterValueEmbedder getValueEmbedder() {
+ return valueEmbedder;
+ }
+
+ public void setReader(StreamRecordReader reader) {
+ this.reader = reader;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/ExternalStreamRuntimeDataContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/ExternalStreamRuntimeDataContext.java
new file mode 100644
index 0000000..f60f2c8
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/ExternalStreamRuntimeDataContext.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider.context;
+
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class ExternalStreamRuntimeDataContext extends DefaultExternalRuntimeDataContext {
+ private final IExternalFilterValueEmbedder valueEmbedder;
+
+ public ExternalStreamRuntimeDataContext(IHyracksTaskContext context, int partition,
+ IExternalFilterValueEmbedder valueEmbedder) {
+ super(context, partition);
+ this.valueEmbedder = valueEmbedder;
+ }
+
+ @Override
+ public IExternalFilterValueEmbedder getValueEmbedder() {
+ return valueEmbedder;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/NoOpExternalRuntimeDataContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/NoOpExternalRuntimeDataContext.java
new file mode 100644
index 0000000..726ec23
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/NoOpExternalRuntimeDataContext.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider.context;
+
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.input.filter.NoOpFilterValueEmbedder;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public final class NoOpExternalRuntimeDataContext implements IExternalDataRuntimeContext {
+ public static final IExternalDataRuntimeContext INSTANCE = new NoOpExternalRuntimeDataContext();
+
+ private NoOpExternalRuntimeDataContext() {
+ }
+
+ @Override
+ public IHyracksTaskContext getTaskContext() {
+ throw new IllegalAccessError("should not be invoked");
+ }
+
+ @Override
+ public int getPartition() {
+ return -1;
+ }
+
+ @Override
+ public IExternalFilterValueEmbedder getValueEmbedder() {
+ return NoOpFilterValueEmbedder.INSTANCE;
+ }
+
+ @Override
+ public Supplier<String> getDatasourceNameSupplier() {
+ return ExternalDataConstants.EMPTY_STRING;
+ }
+
+ @Override
+ public LongSupplier getLineNumberSupplier() {
+ return ExternalDataConstants.NO_LINES;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
index aa269fe..394295d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
@@ -20,8 +20,6 @@
package org.apache.asterix.external.util;
import static org.apache.asterix.external.util.ExternalDataConstants.COMPUTED_FIELD_PATTERN;
-import static org.apache.asterix.external.util.ExternalDataConstants.DEFINITION_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.KEY_PATH;
import static org.apache.asterix.external.util.ExternalDataConstants.PREFIX_DEFAULT_DELIMITER;
import java.io.Serializable;
@@ -51,10 +49,10 @@
import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.util.LogRedactionUtil;
-public class ExternalDataPrefix implements Serializable {
+public final class ExternalDataPrefix implements Serializable {
private static final long serialVersionUID = -7612997190679310483L;
private final String original;
- private String root;
+ private final String protocolContainerPair;
private final boolean endsWithSlash;
private final List<String> segments;
@@ -63,6 +61,7 @@
private final List<Integer> computedFieldSegmentIndexes = new ArrayList<>();
private final List<ARecordType> paths = new ArrayList<>();
private final Map<Integer, PrefixSegment> indexToComputedFieldsMap = new HashMap<>();
+ private String root;
public static final String PREFIX_ROOT_FIELD_NAME = "prefix-root";
public static final Set<ATypeTag> supportedTypes = new HashSet<>();
@@ -74,13 +73,10 @@
}
public ExternalDataPrefix(Map<String, String> configuration) throws AlgebricksException {
- this(getDefinitionOrPath(configuration));
- }
-
- public ExternalDataPrefix(String prefix) throws AlgebricksException {
+ String prefix = ExternalDataUtils.getDefinitionOrPath(configuration);
this.original = prefix != null ? prefix : "";
this.endsWithSlash = this.original.endsWith("/");
-
+ protocolContainerPair = ExternalDataUtils.getProtocolContainerPair(configuration);
segments = extractPrefixSegments(original);
extractComputedFields();
extractRoot();
@@ -291,8 +287,12 @@
return evaluator.evaluate();
}
+ public String removeProtocolContainerPair(String path) {
+ return path.replace(protocolContainerPair, "");
+ }
+
public static boolean containsComputedFields(Map<String, String> configuration) {
- String path = getDefinitionOrPath(configuration);
+ String path = ExternalDataUtils.getDefinitionOrPath(configuration);
return path != null && path.contains("{");
}
@@ -333,10 +333,6 @@
}
}
- private static String getDefinitionOrPath(Map<String, String> configuration) {
- return configuration.getOrDefault(DEFINITION_FIELD_NAME, configuration.get(KEY_PATH));
- }
-
public static class PrefixSegment implements Serializable {
private static final long serialVersionUID = 8788939199985336347L;
private String expression;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 599d2d1..6902175 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -18,11 +18,13 @@
*/
package org.apache.asterix.external.util;
+import static org.apache.asterix.external.util.ExternalDataConstants.DEFINITION_FIELD_NAME;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_DELIMITER;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ESCAPE;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_EXCLUDE;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_EXTERNAL_SCAN_BUFFER_SIZE;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_INCLUDE;
+import static org.apache.asterix.external.util.ExternalDataConstants.KEY_PATH;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START;
@@ -70,6 +72,8 @@
import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
import org.apache.asterix.external.util.aws.s3.S3Constants;
import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.asterix.external.util.azure.blob_storage.AzureConstants;
+import org.apache.asterix.external.util.google.gcs.GCSConstants;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.AUnionType;
@@ -1002,4 +1006,40 @@
return !key.endsWith("/") && predicate.test(matchers, key)
&& externalDataPrefix.evaluate(key, evaluator, warningCollector);
}
+
+ public static String getDefinitionOrPath(Map<String, String> configuration) {
+ return configuration.getOrDefault(DEFINITION_FIELD_NAME, configuration.get(KEY_PATH));
+ }
+
+ public static String getProtocolContainerPair(Map<String, String> configurations) {
+ String container = configurations.getOrDefault(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME, "");
+ String type = configurations.getOrDefault(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, "");
+ String protocol;
+ switch (type) {
+ case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3:
+ protocol = S3Constants.HADOOP_S3_PROTOCOL;
+ break;
+ case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB:
+ protocol = AzureConstants.HADOOP_AZURE_BLOB_PROTOCOL;
+ break;
+ case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE:
+ protocol = AzureConstants.HADOOP_AZURE_DATALAKE_PROTOCOL;
+ break;
+ case ExternalDataConstants.KEY_ADAPTER_NAME_GCS:
+ protocol = GCSConstants.HADOOP_GCS_PROTOCOL;
+ break;
+ case ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS:
+ String path = getDefinitionOrPath(configurations);
+ String[] nodePathPair = path.trim().split("://");
+ protocol = nodePathPair[0];
+ break;
+ case ExternalDataConstants.KEY_HDFS_URL:
+ protocol = ExternalDataConstants.KEY_HDFS_URL;
+ break;
+ default:
+ return "";
+ }
+
+ return protocol + "://" + container + "/";
+ }
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
index 4f36915..bad5db7 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
@@ -25,6 +25,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -32,7 +33,6 @@
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.IWarningCollector;
public class RecordWithPKTestReaderFactory implements IRecordReaderFactory<RecordWithPK<char[]>> {
@@ -56,8 +56,7 @@
}
@Override
- public IRecordReader<? extends RecordWithPK<char[]>> createRecordReader(final IHyracksTaskContext ctx,
- final int partition) {
+ public IRecordReader<? extends RecordWithPK<char[]>> createRecordReader(IExternalDataRuntimeContext context) {
return new TestAsterixMembersReader();
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
index a21aeb6..7965d20 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
@@ -25,11 +25,11 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import com.couchbase.client.core.message.dcp.DCPRequest;
@@ -81,7 +81,8 @@
}
@Override
- public IRecordReader<? extends DCPRequest> createRecordReader(final IHyracksTaskContext ctx, final int partition) {
+ public IRecordReader<? extends DCPRequest> createRecordReader(IExternalDataRuntimeContext context) {
+ int partition = context.getPartition();
return new KVTestReader(partition, bucket, schedule,
(int) Math.ceil((double) numOfRecords / (double) numOfReaders), deleteCycle, upsertCycle,
(numOfRecords / numOfReaders) * partition);
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java
index dbcfe0a..68059c1 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.TreeMap;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordDataParserFactory;
import org.apache.asterix.external.input.record.RecordWithPK;
@@ -32,7 +33,6 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@SuppressWarnings({ "unchecked" })
@@ -66,8 +66,9 @@
@SuppressWarnings("rawtypes")
@Override
- public IRecordDataParser<RecordWithPK<T>> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
- return new TestRecordWithPKParser(recordParserFactory.createRecordParser(ctx));
+ public IRecordDataParser<RecordWithPK<T>> createRecordParser(IExternalDataRuntimeContext context)
+ throws HyracksDataException {
+ return new TestRecordWithPKParser(recordParserFactory.createRecordParser(context));
}
@Override
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
index 888de08..994325e 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
@@ -34,6 +34,7 @@
import org.apache.asterix.external.input.stream.LocalFSInputStream;
import org.apache.asterix.external.parser.ADMDataParser;
import org.apache.asterix.external.parser.RecordWithMetadataParser;
+import org.apache.asterix.external.provider.context.DefaultExternalRuntimeDataContext;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FileSystemWatcher;
import org.apache.asterix.formats.nontagged.ADMPrinterFactoryProvider;
@@ -97,7 +98,7 @@
// create csv with json record reader
CSVToRecordWithMetadataAndPKConverter recordConverter =
new CSVToRecordWithMetadataAndPKConverter(valueIndex, delimiter, metaType, recordType, pkIndicators,
- pkIndexes, keyTypes, ctx.getWarningCollector());
+ pkIndexes, keyTypes, new DefaultExternalRuntimeDataContext(ctx));
// create the value parser <ADM in this case>
ADMDataParser valueParser = new ADMDataParser(recordType, false);
// create parser.
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java
index 22bfb31..bcd0ab1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java
@@ -23,13 +23,13 @@
import java.util.Map;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.metadata.api.IDatasourceFunction;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -59,9 +59,9 @@
}
@Override
- public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+ public IRecordReader<? extends char[]> createRecordReader(IExternalDataRuntimeContext context)
throws HyracksDataException {
- return function.createRecordReader(ctx, partition);
+ return function.createRecordReader(context.getTaskContext(), context.getPartition());
}
@Override
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java
index 48d2908..d1cd1fa 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java
@@ -29,6 +29,7 @@
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.TransactionState;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.metadata.IDatasetDetails;
import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
@@ -71,6 +72,7 @@
}
public Map<String, String> getProperties() {
+ properties.computeIfAbsent(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, k -> getAdapter());
return properties;
}