[ASTERIXDB-3255][RT] Prepare for embedding values in parquet

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Preparing for embedding computed fields' values in Parquet.
This patch introduces IExternalDataRuntimeContext, which
could have different implementations when
creating parsers, readers, and/or input-streams.

Change-Id: Idde191ab6d99c48022bb65bec648e83e249e4b75
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17767
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/PrefixComputedFieldsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/PrefixComputedFieldsTest.java
index efa4c79..b8cc7fa 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/PrefixComputedFieldsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/PrefixComputedFieldsTest.java
@@ -24,17 +24,21 @@
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.junit.Test;
 
 import junit.framework.TestCase;
 
 public class PrefixComputedFieldsTest extends TestCase {
+    private final Map<String, String> configuration = new HashMap<>();
 
     @Test
     public void test() throws Exception {
@@ -49,7 +53,7 @@
         assertTrue(prefix.getIndexToComputedFieldsMap().isEmpty());
 
         String prefix1 = "";
-        prefix = new ExternalDataPrefix(prefix1);
+        prefix = createExternalDataPrefix(prefix1);
         assertEquals("", prefix.getOriginal());
         assertEquals("", prefix.getRoot());
         assertFalse(prefix.isEndsWithSlash());
@@ -60,7 +64,7 @@
         assertTrue(prefix.getIndexToComputedFieldsMap().isEmpty());
 
         String prefix2 = "hotel";
-        prefix = new ExternalDataPrefix(prefix2);
+        prefix = createExternalDataPrefix(prefix2);
         assertEquals("hotel", prefix.getOriginal());
         assertEquals("hotel", prefix.getRoot());
         assertFalse(prefix.isEndsWithSlash());
@@ -71,7 +75,7 @@
         assertTrue(prefix.getIndexToComputedFieldsMap().isEmpty());
 
         String prefix3 = "hotel/{hotel-id:inT}/";
-        prefix = new ExternalDataPrefix(prefix3);
+        prefix = createExternalDataPrefix(prefix3);
         assertEquals("hotel/{hotel-id:inT}/", prefix.getOriginal());
         assertEquals("hotel/", prefix.getRoot());
         assertTrue(prefix.isEndsWithSlash());
@@ -82,7 +86,7 @@
         assertEquals("(.+)", prefix.getIndexToComputedFieldsMap().get(1).getExpression());
 
         String prefix4 = "hotel/{hotel-id:int}-{hotel-name:sTRing}";
-        prefix = new ExternalDataPrefix(prefix4);
+        prefix = createExternalDataPrefix(prefix4);
         assertEquals("hotel/{hotel-id:int}-{hotel-name:sTRing}", prefix.getOriginal());
         assertEquals("hotel", prefix.getRoot());
         assertFalse(prefix.isEndsWithSlash());
@@ -93,7 +97,7 @@
         assertEquals("(.+)-(.+)", prefix.getIndexToComputedFieldsMap().get(1).getExpression());
 
         String prefix5 = "hotel/something/{hotel-id:int}-{hotel-name:sTRing}/review/{year:int}-{month:int}-{day:int}/";
-        prefix = new ExternalDataPrefix(prefix5);
+        prefix = createExternalDataPrefix(prefix5);
         assertEquals("hotel/something/{hotel-id:int}-{hotel-name:sTRing}/review/{year:int}-{month:int}-{day:int}/",
                 prefix.getOriginal());
         assertEquals("hotel/something/", prefix.getRoot());
@@ -107,7 +111,7 @@
         assertEquals("(.+)-(.+)-(.+)", prefix.getIndexToComputedFieldsMap().get(4).getExpression());
 
         String prefix6 = "hotel/something/{hotel-id:int}-{hotel-name:sTRing}/review/{year:int}/{month:int}/{day:int}";
-        prefix = new ExternalDataPrefix(prefix6);
+        prefix = createExternalDataPrefix(prefix6);
         assertEquals("hotel/something/{hotel-id:int}-{hotel-name:sTRing}/review/{year:int}/{month:int}/{day:int}",
                 prefix.getOriginal());
         assertEquals("hotel/something", prefix.getRoot());
@@ -123,7 +127,7 @@
         assertEquals("(.+)", prefix.getIndexToComputedFieldsMap().get(6).getExpression());
 
         String prefix7 = "hotel/{hotel.details.id:int}-{hotel-name:sTRing}";
-        prefix = new ExternalDataPrefix(prefix7);
+        prefix = createExternalDataPrefix(prefix7);
         assertEquals("hotel/{hotel.details.id:int}-{hotel-name:sTRing}", prefix.getOriginal());
         assertEquals("hotel", prefix.getRoot());
         assertFalse(prefix.isEndsWithSlash());
@@ -134,7 +138,7 @@
 
         String prefix8 =
                 "hotel/hotel-{hotel-id:int}-hotel-{hotel-name:sTRing}/review/year-{year:int}/{month:int}-month/day-{day:int}-day";
-        prefix = new ExternalDataPrefix(prefix8);
+        prefix = createExternalDataPrefix(prefix8);
         assertEquals(
                 "hotel/hotel-{hotel-id:int}-hotel-{hotel-name:sTRing}/review/year-{year:int}/{month:int}-month/day-{day:int}-day",
                 prefix.getOriginal());
@@ -175,4 +179,10 @@
             System.out.println("\n");
         }
     }
+
+    private ExternalDataPrefix createExternalDataPrefix(String path) throws AlgebricksException {
+        configuration.clear();
+        configuration.put(ExternalDataConstants.KEY_PATH, path);
+        return new ExternalDataPrefix(configuration);
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEmbeddedValueInformation.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEmbeddedValueInformation.java
index 9ee37d8..39225cd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEmbeddedValueInformation.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEmbeddedValueInformation.java
@@ -22,7 +22,7 @@
 
     String[] getEmbeddedFieldNames();
 
-    boolean IsMissingEmbeddedValues();
+    boolean isMissingEmbeddedValues();
 
     boolean isMissing(String fieldName);
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
index 8eb4475..43e822e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
@@ -21,7 +21,6 @@
 import java.io.InputStream;
 
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.util.IFeedLogManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -30,7 +29,6 @@
     protected AbstractFeedDataFlowController controller;
     protected IFeedLogManager logManager;
     protected IStreamNotificationHandler notificationHandler;
-    protected IExternalFilterValueEmbedder valueEmbedder;
 
     public abstract boolean stop() throws Exception;
 
@@ -50,10 +48,6 @@
         this.notificationHandler = notificationHandler;
     }
 
-    public void setValueEmbedder(IExternalFilterValueEmbedder valueEmbedder) {
-        this.valueEmbedder = valueEmbedder;
-    }
-
     public String getStreamName() {
         return "";
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
index c1d6797..5dbc383 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
@@ -24,7 +24,6 @@
 import org.apache.asterix.builders.OrderedListBuilder;
 import org.apache.asterix.builders.RecordBuilder;
 import org.apache.asterix.builders.UnorderedListBuilder;
-import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.om.base.AMutableOrderedList;
 import org.apache.asterix.om.base.AMutableRecord;
@@ -39,9 +38,6 @@
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
 public interface IDataParser {
-    default void setValueEmbedder(IExternalFilterValueEmbedder valueEmbedder) {
-        // NoOp
-    }
 
     /*
      * The following two static methods are expensive. right now, they are used by RSSFeeds and
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataRuntimeContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataRuntimeContext.java
new file mode 100644
index 0000000..00a9919
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataRuntimeContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * A runtime context, where different {@link IRecordReader} and {@link IDataParser} can have different implementations
+ * of this interface. This context can be created by {@link IExternalDataSourceFactory}.
+ */
+public interface IExternalDataRuntimeContext {
+    IHyracksTaskContext getTaskContext();
+
+    int getPartition();
+
+    IExternalFilterValueEmbedder getValueEmbedder();
+
+    Supplier<String> getDatasourceNameSupplier();
+
+    LongSupplier getLineNumberSupplier();
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index 058fe89..6be46ac 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -28,11 +28,11 @@
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
-import org.apache.asterix.external.input.filter.NoOpFilterValueEmbedder;
-import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.asterix.external.provider.context.DefaultExternalRuntimeDataContext;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 
@@ -71,8 +71,9 @@
     void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector,
             IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException;
 
-    default IExternalFilterValueEmbedder createFilterValueEmbedder(IWarningCollector warningCollector) {
-        return NoOpFilterValueEmbedder.INSTANCE;
+    default IExternalDataRuntimeContext createExternalDataRuntimeContext(IHyracksTaskContext context, int partition)
+            throws HyracksDataException {
+        return new DefaultExternalRuntimeDataContext(context, partition);
     }
 
     /**
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamFactory.java
index a460e49..b430cf0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamFactory.java
@@ -18,15 +18,14 @@
  */
 package org.apache.asterix.external.api;
 
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IInputStreamFactory extends IExternalDataSourceFactory {
 
-    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException;
+    AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException;
 
     @Override
-    public default DataSourceType getDataSourceType() {
+    default DataSourceType getDataSourceType() {
         return DataSourceType.STREAM;
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
index f544ca0..e4ab8bd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
@@ -19,18 +19,8 @@
 package org.apache.asterix.external.api;
 
 import java.io.IOException;
-import java.util.function.LongSupplier;
 
 @FunctionalInterface
 public interface IRecordConverter<I, O> {
-
-    public O convert(IRawRecord<? extends I> input) throws IOException;
-
-    /**
-     * Configures the converter with information suppliers from the {@link IRecordReader} data source.
-     *
-     * @param lineNumber line number supplier
-     */
-    default void configure(LongSupplier lineNumber) {
-    }
+    O convert(IRawRecord<? extends I> input) throws IOException;
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
index 31e1764..2cb4c4b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
@@ -19,8 +19,6 @@
 package org.apache.asterix.external.api;
 
 import java.io.DataOutput;
-import java.util.function.LongSupplier;
-import java.util.function.Supplier;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -37,14 +35,5 @@
      * @return true if the record was parsed successfully and written to out. False, otherwise.
      * @throws HyracksDataException HyracksDataException
      */
-    public boolean parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
-
-    /**
-     * Configures the parser with information suppliers from the {@link IRecordReader} data source.
-     *
-     * @param dataSourceName data source name supplier
-     * @param lineNumber     line number supplier
-     */
-    default void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) {
-    }
+    boolean parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java
index 2ddbbcd..b9032a0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java
@@ -19,16 +19,15 @@
 package org.apache.asterix.external.api;
 
 import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IRecordDataParserFactory<T> extends IDataParserFactory {
-    public IRecordDataParser<T> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException;
+    IRecordDataParser<T> createRecordParser(IExternalDataRuntimeContext context) throws HyracksDataException;
 
-    public Class<?> getRecordClass();
+    Class<?> getRecordClass();
 
     @Override
-    public default DataSourceType getDataSourceType() {
+    default DataSourceType getDataSourceType() {
         return DataSourceType.RECORDS;
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index b7d4c85..b80e46c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -25,7 +25,6 @@
 
 import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.IFeedLogManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -75,10 +74,6 @@
      */
     public void setFeedLogManager(IFeedLogManager feedLogManager) throws HyracksDataException;
 
-    default void setValueEmbedder(IExternalFilterValueEmbedder valueEmbedder) {
-        // NoOp
-    }
-
     /**
      * gives the record reader a chance to recover from IO errors during feed intake
      */
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
index 8849508..64be02e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
@@ -22,12 +22,11 @@
 import java.util.Set;
 
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IRecordReaderFactory<T> extends IExternalDataSourceFactory {
 
-    IRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition) throws HyracksDataException;
+    IRecordReader<? extends T> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException;
 
     Class<?> getRecordClass();
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
index ad9acc6..7b56319 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
@@ -18,11 +18,9 @@
  */
 package org.apache.asterix.external.api;
 
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IStreamDataParserFactory extends IDataParserFactory {
 
-    public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException;
+    IStreamDataParser createInputStreamParser(IExternalDataRuntimeContext context) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index d7093b9..46afdbe 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -33,14 +33,17 @@
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
 import org.apache.asterix.external.input.record.reader.hdfs.parquet.ParquetFileRecordReader;
 import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
 import org.apache.asterix.external.input.stream.HDFSInputStream;
 import org.apache.asterix.external.provider.StreamRecordReaderProvider;
+import org.apache.asterix.external.provider.context.ExternalStreamRuntimeDataContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.HDFSUtils;
@@ -81,19 +84,21 @@
     private InputSplit[] inputSplits;
     private String nodeName;
     private Class recordReaderClazz;
+    private IExternalFilterEvaluatorFactory filterEvaluatorFactory;
 
     @Override
     public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
             IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
             throws AlgebricksException, HyracksDataException {
-        JobConf hdfsConf = createHdfsConf(serviceCtx, configuration);
+        JobConf hdfsConf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
         configureHdfsConf(hdfsConf, configuration);
     }
 
-    protected JobConf createHdfsConf(IServiceContext serviceCtx, Map<String, String> configuration)
-            throws HyracksDataException {
+    protected JobConf prepareHDFSConf(IServiceContext serviceCtx, Map<String, String> configuration,
+            IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws HyracksDataException {
         this.serviceCtx = serviceCtx;
         this.configuration = configuration;
+        this.filterEvaluatorFactory = filterEvaluatorFactory;
         init((ICCServiceContext) serviceCtx);
         return HDFSUtils.configureHDFSJobConf(configuration);
     }
@@ -210,9 +215,10 @@
      * 2. Indexing Stream Record Reader: When we transform the input into a byte stream and perform indexing.
      * 3. HDFS Record Reader: When we simply pass the Writable object as it is to the parser.
      */
+    @SuppressWarnings("unchecked")
     @Override
-    public IRecordReader<? extends Object> createRecordReader(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
+    public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException {
+        IHyracksTaskContext ctx = context.getTaskContext();
         try {
             if (recordReaderClazz != null) {
                 StreamRecordReader streamReader = (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
@@ -231,8 +237,7 @@
                  */
                 readerConf = confFactory.getConf();
             }
-            return createRecordReader(configuration, read, inputSplits, readSchedule, nodeName, readerConf,
-                    ctx.getWarningCollector());
+            return createRecordReader(configuration, read, inputSplits, readSchedule, nodeName, readerConf, context);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
@@ -248,11 +253,19 @@
         return recordReaderNames;
     }
 
-    private static IRecordReader<? extends Object> createRecordReader(Map<String, String> configuration, boolean[] read,
+    @Override
+    public IExternalDataRuntimeContext createExternalDataRuntimeContext(IHyracksTaskContext context, int partition) {
+        IExternalFilterValueEmbedder valueEmbedder =
+                filterEvaluatorFactory.createValueEmbedder(context.getWarningCollector());
+        return new ExternalStreamRuntimeDataContext(context, partition, valueEmbedder);
+    }
+
+    private static IRecordReader<?> createRecordReader(Map<String, String> configuration, boolean[] read,
             InputSplit[] inputSplits, String[] readSchedule, String nodeName, JobConf conf,
-            IWarningCollector warningCollector) {
+            IExternalDataRuntimeContext context) {
         if (configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT.trim())
                 .equals(ExternalDataConstants.INPUT_FORMAT_PARQUET)) {
+            IWarningCollector warningCollector = context.getTaskContext().getWarningCollector();
             return new ParquetFileRecordReader<>(read, inputSplits, readSchedule, nodeName, conf, warningCollector);
         } else {
             return new HDFSRecordReader<>(read, inputSplits, readSchedule, nodeName, conf);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpFilterValueEmbedder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpFilterValueEmbedder.java
index fe15387..e4a9131 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpFilterValueEmbedder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpFilterValueEmbedder.java
@@ -45,17 +45,12 @@
     }
 
     @Override
-    public boolean shouldEmbed(IValueReference fieldName, ATypeTag typeTag) {
-        return false;
-    }
-
-    @Override
     public IValueReference getEmbeddedValue() {
         throw new IllegalAccessError("Cannot embed a value to " + this.getClass().getName());
     }
 
     @Override
-    public boolean IsMissingEmbeddedValues() {
+    public boolean isMissingEmbeddedValues() {
         return false;
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/EmbeddedValueBuilder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/EmbeddedValueBuilder.java
index ca6921b..5ffd7d9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/EmbeddedValueBuilder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/EmbeddedValueBuilder.java
@@ -60,7 +60,7 @@
     }
 
     void build(String path) throws HyracksDataException {
-        List<String> values = prefix.getValues(path);
+        List<String> values = prefix.getValues(prefix.removeProtocolContainerPair(path));
         build(allPaths, values);
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/ExternalFilterValueEmbedder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/ExternalFilterValueEmbedder.java
index 358cb64..2e9864b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/ExternalFilterValueEmbedder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/ExternalFilterValueEmbedder.java
@@ -34,7 +34,7 @@
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.data.std.api.IValueReference;
 
-public final class ExternalFilterValueEmbedder implements IExternalFilterValueEmbedder {
+public class ExternalFilterValueEmbedder implements IExternalFilterValueEmbedder {
     private final ARecordType allPaths;
     private final IWarningCollector warningCollector;
     private final Map<IAType, BitSet> setValues;
@@ -79,11 +79,6 @@
     }
 
     @Override
-    public boolean shouldEmbed(IValueReference fieldName, ATypeTag typeTag) {
-        throw new IllegalAccessError("Should not be invoked");
-    }
-
-    @Override
     public IValueReference getEmbeddedValue() {
         IValueReference value = builder.getValue(currentType);
         if (currentType.getTypeTag() != ATypeTag.OBJECT) {
@@ -96,7 +91,7 @@
     }
 
     @Override
-    public boolean IsMissingEmbeddedValues() {
+    public boolean isMissingEmbeddedValues() {
         if (currentType.getTypeTag() == ATypeTag.MISSING) {
             return false;
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/IExternalFilterValueEmbedder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/IExternalFilterValueEmbedder.java
index e4623c5..91bf32c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/IExternalFilterValueEmbedder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/embedder/IExternalFilterValueEmbedder.java
@@ -30,8 +30,6 @@
 
     boolean shouldEmbed(String fieldName, ATypeTag typeTag);
 
-    boolean shouldEmbed(IValueReference fieldName, ATypeTag typeTag);
-
     void enterObject();
 
     void exitObject();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
index 8b930aa..ad59f75 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
@@ -23,6 +23,7 @@
 
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.input.record.CharArrayRecord;
 import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
@@ -39,17 +40,19 @@
     private final int valueIndex;
     private final RecordWithMetadataAndPK<char[]> recordWithMetadata;
     private final CharArrayRecord record;
-    private LongSupplier lineNumber = ExternalDataConstants.NO_LINES;
+    private final LongSupplier lineNumber;
 
     public CSVToRecordWithMetadataAndPKConverter(final int valueIndex, final char delimiter, final ARecordType metaType,
             final ARecordType recordType, final int[] keyIndicator, final int[] keyIndexes, final IAType[] keyTypes,
-            IWarningCollector warningCollector) {
+            IExternalDataRuntimeContext context) {
+        IWarningCollector warningCollector = context.getTaskContext().getWarningCollector();
         this.cursor = new FieldCursorForDelimitedDataParser(null, delimiter, ExternalDataConstants.QUOTE,
                 warningCollector, ExternalDataConstants.EMPTY_STRING);
         this.record = new CharArrayRecord();
         this.valueIndex = valueIndex;
         this.recordWithMetadata = new RecordWithMetadataAndPK<>(record, metaType.getFieldTypes(), recordType,
                 keyIndicator, keyIndexes, keyTypes);
+        lineNumber = context.getLineNumberSupplier();
     }
 
     @Override
@@ -79,9 +82,4 @@
         }
         return recordWithMetadata;
     }
-
-    @Override
-    public void configure(LongSupplier lineNumber) {
-        this.lineNumber = lineNumber == null ? ExternalDataConstants.NO_LINES : lineNumber;
-    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVWithRecordConverterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVWithRecordConverterFactory.java
index a9e5bc7..1961500 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVWithRecordConverterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVWithRecordConverterFactory.java
@@ -22,12 +22,12 @@
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordConverter;
 import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class CSVWithRecordConverterFactory implements IRecordConverterFactory<char[], RecordWithMetadataAndPK<char[]>> {
 
@@ -41,9 +41,10 @@
     private IAType[] keyTypes;
 
     @Override
-    public IRecordConverter<char[], RecordWithMetadataAndPK<char[]>> createConverter(IHyracksTaskContext ctx) {
+    public IRecordConverter<char[], RecordWithMetadataAndPK<char[]>> createConverter(
+            IExternalDataRuntimeContext context) {
         return new CSVToRecordWithMetadataAndPKConverter(recordIndex, delimiter, metaType, recordType, keyIndicators,
-                keyIndexes, keyTypes, ctx.getWarningCollector());
+                keyIndexes, keyTypes, context);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
index b228b94..eb4c8e1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
@@ -21,10 +21,10 @@
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordConverter;
 import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 import com.couchbase.client.core.message.dcp.DCPRequest;
 
@@ -51,7 +51,8 @@
     }
 
     @Override
-    public IRecordConverter<DCPRequest, RecordWithMetadataAndPK<char[]>> createConverter(IHyracksTaskContext ctx) {
+    public IRecordConverter<DCPRequest, RecordWithMetadataAndPK<char[]>> createConverter(
+            IExternalDataRuntimeContext context) {
         return new DCPMessageToRecordConverter();
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java
index 875a331..96ab8ba 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/IRecordConverterFactory.java
@@ -22,22 +22,22 @@
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordConverter;
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public interface IRecordConverterFactory<I, O> extends Serializable {
 
-    public IRecordConverter<I, O> createConverter(IHyracksTaskContext ctx);
+    IRecordConverter<I, O> createConverter(IExternalDataRuntimeContext context);
 
-    public void configure(Map<String, String> configuration) throws AsterixException;
+    void configure(Map<String, String> configuration) throws AsterixException;
 
-    public Class<?> getInputClass();
+    Class<?> getInputClass();
 
-    public Class<?> getOutputClass();
+    Class<?> getOutputClass();
 
-    public void setRecordType(ARecordType recordType);
+    void setRecordType(ARecordType recordType);
 
-    public void setMetaType(ARecordType metaType);
+    void setMetaType(ARecordType metaType);
 
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
index 8ef19ad..8b43cb7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
@@ -22,6 +22,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.input.stream.AbstractMultipleInputStream;
 import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.logging.log4j.LogManager;
@@ -36,11 +37,14 @@
 
     // File fields
     protected final List<String> filePaths;
+    private final IExternalFilterValueEmbedder valueEmbedder;
     protected int nextFileIndex = 0;
 
-    public AbstractExternalInputStream(Map<String, String> configuration, List<String> filePaths) {
+    public AbstractExternalInputStream(Map<String, String> configuration, List<String> filePaths,
+            IExternalFilterValueEmbedder valueEmbedder) {
         this.configuration = configuration;
         this.filePaths = filePaths;
+        this.valueEmbedder = valueEmbedder;
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
index 34e3d2d..a1d5d0a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -28,8 +28,10 @@
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.asterix.external.provider.context.ExternalStreamRuntimeDataContext;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -52,7 +54,7 @@
     }
 
     @Override
-    public abstract AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition)
+    public abstract AsterixInputStream createInputStream(IExternalDataRuntimeContext context)
             throws HyracksDataException;
 
     @Override
@@ -74,8 +76,10 @@
     }
 
     @Override
-    public IExternalFilterValueEmbedder createFilterValueEmbedder(IWarningCollector warningCollector) {
-        return filterEvaluatorFactory.createValueEmbedder(warningCollector);
+    public IExternalDataRuntimeContext createExternalDataRuntimeContext(IHyracksTaskContext context, int partition) {
+        IExternalFilterValueEmbedder valueEmbedder =
+                filterEvaluatorFactory.createValueEmbedder(context.getWarningCollector());
+        return new ExternalStreamRuntimeDataContext(context, partition, valueEmbedder);
     }
 
     public static class PartitionWorkLoadBasedOnSize implements Serializable {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index bbcf9cd..56feb8a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -29,6 +29,7 @@
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.aws.s3.S3Utils;
@@ -50,8 +51,9 @@
     private final S3Client s3Client;
     private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service
 
-    public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
-        super(configuration, filePaths);
+    public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths,
+            IExternalFilterValueEmbedder valueEmbedder) throws HyracksDataException {
+        super(configuration, filePaths, valueEmbedder);
         this.s3Client = buildAwsS3Client(configuration);
         this.bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 5b4e314..36d21d1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -26,13 +26,14 @@
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
 import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 
@@ -43,8 +44,11 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
-        return new AwsS3InputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+    public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException {
+        IExternalFilterValueEmbedder valueEmbedder = context.getValueEmbedder();
+        int partition = context.getPartition();
+        return new AwsS3InputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(),
+                valueEmbedder);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 98b5b06..f335538 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -83,7 +83,7 @@
 
         //Configure Hadoop S3 input splits
         try {
-            JobConf conf = createHdfsConf(serviceCtx, configuration);
+            JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
             int numberOfPartitions = getPartitionConstraint().getLocations().length;
             configureAwsS3HdfsJobConf(conf, configuration, numberOfPartitions);
             configureHdfsConf(conf, configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
index bbfece2..52d16c1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
@@ -30,6 +30,7 @@
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -46,9 +47,9 @@
     private final BlobServiceClient client;
     private final String container;
 
-    public AzureBlobInputStream(IApplicationContext appCtx, Map<String, String> configuration, List<String> filePaths)
-            throws HyracksDataException {
-        super(configuration, filePaths);
+    public AzureBlobInputStream(IApplicationContext appCtx, Map<String, String> configuration, List<String> filePaths,
+            IExternalFilterValueEmbedder valueEmbedder) throws HyracksDataException {
+        super(configuration, filePaths, valueEmbedder);
         this.client = buildAzureClient(appCtx, configuration);
         this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
index ada68ac..f31e823 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
@@ -29,12 +29,13 @@
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
 import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 
@@ -45,11 +46,13 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
-        IApplicationContext appCtx =
-                (IApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
+    public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException {
+        IApplicationContext appCtx = (IApplicationContext) context.getTaskContext().getJobletContext()
+                .getServiceContext().getApplicationContext();
+        IExternalFilterValueEmbedder valueEmbedder = context.getValueEmbedder();
+        int partition = context.getPartition();
         return new AzureBlobInputStream(appCtx, configuration,
-                partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+                partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(), valueEmbedder);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
index 7a95222..ba47d57 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
@@ -30,6 +30,7 @@
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -47,8 +48,8 @@
     private final String container;
 
     public AzureDataLakeInputStream(IApplicationContext appCtx, Map<String, String> configuration,
-            List<String> filePaths) throws HyracksDataException {
-        super(configuration, filePaths);
+            List<String> filePaths, IExternalFilterValueEmbedder valueEmbedder) throws HyracksDataException {
+        super(configuration, filePaths, valueEmbedder);
         this.client = buildAzureClient(appCtx, configuration);
         this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
index 89a1d80..41b041c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
@@ -29,11 +29,12 @@
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 
@@ -45,11 +46,13 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
-        IApplicationContext appCtx =
-                (IApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
+    public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException {
+        IApplicationContext appCtx = (IApplicationContext) context.getTaskContext().getJobletContext()
+                .getServiceContext().getApplicationContext();
+        IExternalFilterValueEmbedder valueEmbedder = context.getValueEmbedder();
+        int partition = context.getPartition();
         return new AzureDataLakeInputStream(appCtx, configuration,
-                partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+                partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(), valueEmbedder);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index 25ac181..530ce74 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -79,7 +79,7 @@
         putAzureBlobConfToHadoopConf(configuration, path);
 
         // configure Hadoop Azure input splits
-        JobConf conf = createHdfsConf(serviceCtx, configuration);
+        JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
         configureAzureHdfsJobConf(conf, configuration, endPoint);
         configureHdfsConf(conf, configuration);
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
index 3ef9c56..4dedb08 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
@@ -66,7 +66,7 @@
         putAzureDataLakeConfToHadoopConf(configuration, path);
 
         //Configure Hadoop Azure input splits
-        JobConf conf = createHdfsConf(serviceCtx, configuration);
+        JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
         configureAzureHdfsJobConf(conf, configuration, endPoint);
         configureHdfsConf(conf, configuration);
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
index 4657bd0..ebc9907 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
@@ -30,6 +30,7 @@
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.google.gcs.GCSUtils;
@@ -49,8 +50,9 @@
     private final String container;
     private static final int MAX_ATTEMPTS = 5; // We try a total of 5 times in case of retryable errors
 
-    public GCSInputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
-        super(configuration, filePaths);
+    public GCSInputStream(Map<String, String> configuration, List<String> filePaths,
+            IExternalFilterValueEmbedder valueEmbedder) throws HyracksDataException {
+        super(configuration, filePaths, valueEmbedder);
         this.client = buildClient(configuration);
         this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
index 74f6428..b6ad3cd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
@@ -26,13 +26,14 @@
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
 import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.google.gcs.GCSUtils;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 
@@ -43,8 +44,11 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
-        return new GCSInputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+    public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException {
+        IExternalFilterValueEmbedder valueEmbedder = context.getValueEmbedder();
+        int partition = context.getPartition();
+        return new GCSInputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(),
+                valueEmbedder);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
index 50a1039..17cad3e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
@@ -69,7 +69,7 @@
         putGCSConfToHadoopConf(configuration, path);
 
         // configure hadoop input splits
-        JobConf conf = createHdfsConf(serviceCtx, configuration);
+        JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
         int numberOfPartitions = getPartitionConstraint().getLocations().length;
         GCSUtils.configureHdfsJobConf(conf, configuration, numberOfPartitions);
         configureHdfsConf(conf, configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
index b5ade31..4736675 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReaderFactory.java
@@ -24,6 +24,7 @@
 
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -32,7 +33,6 @@
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.http.server.HttpServerConfigBuilder;
@@ -54,10 +54,10 @@
     private List<Pair<String, Integer>> serverAddrs;
 
     @Override
-    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+    public IRecordReader<? extends char[]> createRecordReader(IExternalDataRuntimeContext context)
             throws HyracksDataException {
         try {
-            return new HttpServerRecordReader(serverAddrs.get(partition).getRight(), entryPoint, queueSize,
+            return new HttpServerRecordReader(serverAddrs.get(context.getPartition()).getRight(), entryPoint, queueSize,
                     HttpServerConfigBuilder.createDefault());
         } catch (Exception e) {
             throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
index 0026f07..21ab53e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
@@ -27,6 +27,7 @@
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -34,7 +35,6 @@
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 
@@ -86,10 +86,10 @@
     }
 
     @Override
-    public IRecordReader<? extends SyndEntry> createRecordReader(IHyracksTaskContext ctx, int partition)
+    public IRecordReader<? extends SyndEntry> createRecordReader(IExternalDataRuntimeContext context)
             throws HyracksDataException {
         try {
-            return new RSSRecordReader(urls.get(partition));
+            return new RSSRecordReader(urls.get(context.getPartition()));
         } catch (MalformedURLException e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
index 9d12325..937f3fe 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
@@ -31,7 +31,6 @@
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IStreamNotificationHandler;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.input.record.CharArrayRecord;
 import org.apache.asterix.external.input.stream.AsterixInputStreamReader;
 import org.apache.asterix.external.util.ExternalDataUtils;
@@ -49,11 +48,9 @@
     protected IFeedLogManager feedLogManager;
     private Supplier<String> dataSourceName = EMPTY_STRING;
     private Supplier<String> previousDataSourceName = EMPTY_STRING;
-    private AsterixInputStream inputStream;
 
     public void configure(AsterixInputStream inputStream, Map<String, String> config) {
         int bufferSize = ExternalDataUtils.getOrDefaultBufferSize(config);
-        this.inputStream = inputStream;
         this.reader = new AsterixInputStreamReader(inputStream, bufferSize);
         record = new CharArrayRecord();
         inputBuffer = new char[bufferSize];
@@ -123,11 +120,6 @@
         return dataSourceName;
     }
 
-    @Override
-    public void setValueEmbedder(IExternalFilterValueEmbedder valueEmbedder) {
-        inputStream.setValueEmbedder(valueEmbedder);
-    }
-
     String getPreviousStreamName() {
         return previousDataSourceName.get();
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index c9567c5..d8168ce 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -35,6 +36,7 @@
 import org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory;
 import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory;
 import org.apache.asterix.external.provider.StreamRecordReaderProvider;
+import org.apache.asterix.external.provider.context.ExternalReaderRuntimeDataContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -82,22 +84,19 @@
     }
 
     @Override
-    public final IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+    public final IRecordReader<? extends char[]> createRecordReader(IExternalDataRuntimeContext context)
             throws HyracksDataException {
-        try {
-            StreamRecordReader streamRecordReader =
-                    (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
-            streamRecordReader.configure(ctx, streamFactory.createInputStream(ctx, partition), configuration);
-            return streamRecordReader;
-        } catch (InstantiationException | IllegalAccessException | InvocationTargetException
-                | NoSuchMethodException e) {
-            throw HyracksDataException.create(e);
-        }
+        StreamRecordReader reader = createReader(context);
+        ((ExternalReaderRuntimeDataContext) context).setReader(reader);
+        return reader;
     }
 
     @Override
-    public final IExternalFilterValueEmbedder createFilterValueEmbedder(IWarningCollector warningCollector) {
-        return filterEvaluatorFactory.createValueEmbedder(warningCollector);
+    public final IExternalDataRuntimeContext createExternalDataRuntimeContext(IHyracksTaskContext context,
+            int partition) throws HyracksDataException {
+        IExternalFilterValueEmbedder valueEmbedder =
+                filterEvaluatorFactory.createValueEmbedder(context.getWarningCollector());
+        return new ExternalReaderRuntimeDataContext(context, partition, valueEmbedder);
     }
 
     @Override
@@ -118,4 +117,18 @@
             throw new CompilationException(ErrorCode.FEED_UNKNOWN_ADAPTER_NAME);
         }
     }
+
+    @SuppressWarnings("unchecked")
+    private StreamRecordReader createReader(IExternalDataRuntimeContext context) throws HyracksDataException {
+        try {
+            StreamRecordReader streamRecordReader =
+                    (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
+            streamRecordReader.configure(context.getTaskContext(), streamFactory.createInputStream(context),
+                    configuration);
+            return streamRecordReader;
+        } catch (InstantiationException | IllegalAccessException | InvocationTargetException
+                | NoSuchMethodException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index ef55d4a..2f87074 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -37,7 +38,6 @@
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.logging.log4j.LogManager;
@@ -127,7 +127,7 @@
     }
 
     @Override
-    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+    public IRecordReader<? extends char[]> createRecordReader(IExternalDataRuntimeContext context)
             throws HyracksDataException {
         IRecordReader<? extends char[]> recordReader;
         switch (configuration.get(ExternalDataConstants.KEY_READER)) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
index 4f07a78..fd7907f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
@@ -31,6 +31,7 @@
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.INodeResolver;
 import org.apache.asterix.external.api.INodeResolverFactory;
@@ -42,7 +43,6 @@
 import org.apache.asterix.runtime.utils.RuntimeUtils;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.io.UnmanagedFileSplit;
@@ -66,10 +66,10 @@
     private transient FileSystemWatcher watcher;
 
     @Override
-    public synchronized AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition)
+    public synchronized AsterixInputStream createInputStream(IExternalDataRuntimeContext context)
             throws HyracksDataException {
         if (watcher == null) {
-            String nodeName = ctx.getJobletContext().getServiceContext().getNodeId();
+            String nodeName = context.getTaskContext().getJobletContext().getServiceContext().getNodeId();
             ArrayList<Path> inputResources = new ArrayList<>();
             for (int i = 0; i < inputFileSplits.length; i++) {
                 if (inputFileSplits[i].getNodeName().equals(nodeName)) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
index ceabbfc..12c70bb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
@@ -29,6 +29,7 @@
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.input.stream.SocketClientInputStream;
@@ -38,7 +39,6 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 
@@ -84,9 +84,9 @@
     }
 
     @Override
-    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+    public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException {
         try {
-            return new SocketClientInputStream(sockets.get(partition));
+            return new SocketClientInputStream(sockets.get(context.getPartition()));
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
index c8da459..7106489 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
@@ -28,6 +28,7 @@
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.input.stream.SocketServerInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -35,7 +36,6 @@
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 
@@ -59,10 +59,10 @@
     }
 
     @Override
-    public synchronized AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition)
+    public synchronized AsterixInputStream createInputStream(IExternalDataRuntimeContext context)
             throws HyracksDataException {
         try {
-            Pair<String, Integer> socket = sockets.get(partition);
+            Pair<String, Integer> socket = sockets.get(context.getPartition());
             ServerSocket server;
             server = new ServerSocket();
             server.bind(new InetSocketAddress(socket.getRight()));
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
index 7a807a4..d486228 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
@@ -26,11 +26,11 @@
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 
@@ -93,9 +93,9 @@
     }
 
     @Override
-    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+    public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException {
         try {
-            return new TwitterFirehoseInputStream(configuration, partition);
+            return new TwitterFirehoseInputStream(configuration, context.getPartition());
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractJsonDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractJsonDataParser.java
index d9e374f..de9962c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractJsonDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractJsonDataParser.java
@@ -30,12 +30,11 @@
 import org.apache.asterix.builders.IAsterixListBuilder;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.input.filter.NoOpFilterValueEmbedder;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.parser.jackson.ADMToken;
 import org.apache.asterix.external.parser.jackson.GeometryCoParser;
 import org.apache.asterix.external.parser.jackson.ParserContext;
-import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.om.base.ABoolean;
 import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.base.AUnorderedList;
@@ -69,27 +68,29 @@
     protected final JsonFactory jsonFactory;
     protected final ARecordType rootType;
     protected final GeometryCoParser geometryCoParser;
-    protected Supplier<String> dataSourceName = ExternalDataConstants.EMPTY_STRING;
-    protected LongSupplier lineNumber = ExternalDataConstants.NO_LINES;
+    protected final Supplier<String> dataSourceName;
+    protected final LongSupplier lineNumber;
+    protected final IExternalFilterValueEmbedder valueEmbedder;
 
     protected JsonParser jsonParser;
 
-    protected IExternalFilterValueEmbedder valueEmbedder;
-
     /**
      * Initialize JSONDataParser with GeometryCoParser
      *
      * @param recordType  defined type.
      * @param jsonFactory Jackson JSON parser factory.
      */
-    public AbstractJsonDataParser(ARecordType recordType, JsonFactory jsonFactory) {
+    public AbstractJsonDataParser(ARecordType recordType, JsonFactory jsonFactory,
+            IExternalDataRuntimeContext context) {
         // recordType currently cannot be null, however this is to guarantee for any future changes.
         this.rootType = recordType != null ? recordType : RecordUtil.FULLY_OPEN_RECORD_TYPE;
         this.jsonFactory = jsonFactory;
         //GeometryCoParser to parse GeoJSON objects to AsterixDB internal spatial types.
         geometryCoParser = new GeometryCoParser(jsonParser);
         parserContext = new ParserContext();
-        valueEmbedder = NoOpFilterValueEmbedder.INSTANCE;
+        dataSourceName = context.getDatasourceNameSupplier();
+        lineNumber = context.getLineNumberSupplier();
+        valueEmbedder = context.getValueEmbedder();
     }
 
     /*
@@ -245,7 +246,7 @@
             checkOptionalConstraints(recordType, nullBitMap);
         }
 
-        if (valueEmbedder.IsMissingEmbeddedValues()) {
+        if (valueEmbedder.isMissingEmbeddedValues()) {
             String[] embeddedFieldNames = valueEmbedder.getEmbeddedFieldNames();
             for (int i = 0; i < embeddedFieldNames.length; i++) {
                 String embeddedFieldName = embeddedFieldNames[i];
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
index 2936d11..bc855b4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -34,17 +34,16 @@
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IStreamDataParser;
-import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.om.base.AMutableString;
 import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -65,20 +64,20 @@
     private final ArrayBackedValueStorage fieldValueBuffer;
     private final DataOutput fieldValueBufferOutput;
     private final IValueParser[] valueParsers;
-    private FieldCursorForDelimitedDataParser cursor;
-    private Supplier<String> dataSourceName;
-    private LongSupplier lineNumber;
+    private final Supplier<String> dataSourceName;
+    private final LongSupplier lineNumber;
     private final byte[] fieldTypeTags;
     private final int[] fldIds;
     private final ArrayBackedValueStorage[] nameBuffers;
     private final char[] nullChars;
+    private FieldCursorForDelimitedDataParser cursor;
 
-    public DelimitedDataParser(IHyracksTaskContext ctx, IValueParserFactory[] valueParserFactories, char fieldDelimiter,
-            char quote, boolean hasHeader, ARecordType recordType, boolean isStreamParser, String nullString)
-            throws HyracksDataException {
-        this.dataSourceName = ExternalDataConstants.EMPTY_STRING;
-        this.lineNumber = ExternalDataConstants.NO_LINES;
-        this.warnings = ctx.getWarningCollector();
+    public DelimitedDataParser(IExternalDataRuntimeContext context, IValueParserFactory[] valueParserFactories,
+            char fieldDelimiter, char quote, boolean hasHeader, ARecordType recordType, boolean isStreamParser,
+            String nullString) throws HyracksDataException {
+        this.dataSourceName = context.getDatasourceNameSupplier();
+        this.lineNumber = context.getLineNumberSupplier();
+        this.warnings = context.getTaskContext().getWarningCollector();
         this.fieldDelimiter = fieldDelimiter;
         this.quote = quote;
         this.hasHeader = hasHeader;
@@ -245,13 +244,6 @@
         return true;
     }
 
-    @Override
-    public void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) {
-        this.dataSourceName = dataSourceName == null ? ExternalDataConstants.EMPTY_STRING : dataSourceName;
-        this.lineNumber = lineNumber == null ? ExternalDataConstants.NO_LINES : lineNumber;
-
-    }
-
     private String getDataSourceName() {
         return dataSourceName.get();
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
index 20b466b..f79cee1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
@@ -23,17 +23,15 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.function.LongSupplier;
-import java.util.function.Supplier;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IStreamDataParser;
-import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.parser.jackson.ADMToken;
-import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.provider.context.NoOpExternalRuntimeDataContext;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -48,13 +46,23 @@
 public class JSONDataParser extends AbstractJsonDataParser implements IStreamDataParser, IRecordDataParser<char[]> {
 
     /**
-     * Initialize JSONDataParser
+     * This constructor is used by several query runtime evaluators and tests
      *
      * @param recordType  defined type.
      * @param jsonFactory Jackson JSON parser factory.
      */
     public JSONDataParser(ARecordType recordType, JsonFactory jsonFactory) {
-        super(recordType, jsonFactory);
+        this(recordType, jsonFactory, NoOpExternalRuntimeDataContext.INSTANCE);
+    }
+
+    /**
+     * Initialize JSONDataParser
+     *
+     * @param recordType  defined type.
+     * @param jsonFactory Jackson JSON parser factory.
+     */
+    public JSONDataParser(ARecordType recordType, JsonFactory jsonFactory, IExternalDataRuntimeContext context) {
+        super(recordType, jsonFactory, context);
     }
 
     @Override
@@ -62,11 +70,6 @@
         setInput(jsonFactory.createParser(in));
     }
 
-    @Override
-    public void setValueEmbedder(IExternalFilterValueEmbedder valueEmbedder) {
-        this.valueEmbedder = valueEmbedder;
-    }
-
     public void setInputNode(JsonNode node) {
         setInput(new TreeTraversingParser(node));
     }
@@ -94,12 +97,6 @@
     }
 
     @Override
-    public void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) {
-        this.dataSourceName = dataSourceName == null ? ExternalDataConstants.EMPTY_STRING : dataSourceName;
-        this.lineNumber = lineNumber == null ? ExternalDataConstants.NO_LINES : lineNumber;
-    }
-
-    @Override
     public boolean parse(DataOutput out) throws HyracksDataException {
         try {
             if (nextToken() == ADMToken.EOF) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
index 820775c..d799f22 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
@@ -20,8 +20,6 @@
 
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.function.LongSupplier;
-import java.util.function.Supplier;
 
 import org.apache.asterix.builders.RecordBuilder;
 import org.apache.asterix.external.api.IDataParser;
@@ -113,10 +111,4 @@
     public void appendLastParsedPrimaryKeyToTuple(ArrayTupleBuilder tb) throws HyracksDataException {
         rwm.appendPrimaryKeyToTuple(tb);
     }
-
-    @Override
-    public void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) {
-        this.recordParser.configure(dataSourceName, lineNumber);
-        this.converter.configure(lineNumber);
-    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/evaluators/StringJsonParseEval.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/evaluators/StringJsonParseEval.java
index 7589e33..23c3591 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/evaluators/StringJsonParseEval.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/evaluators/StringJsonParseEval.java
@@ -26,6 +26,7 @@
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.parser.JSONDataParser;
 import org.apache.asterix.external.parser.factory.JSONDataParserFactory;
+import org.apache.asterix.external.provider.context.DefaultExternalRuntimeDataContext;
 import org.apache.asterix.om.exceptions.ExceptionUtil;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
@@ -57,7 +58,8 @@
             throws IOException {
         this.ctx = ctx;
         this.inputEval = inputEval;
-        this.parser = (JSONDataParser) new JSONDataParserFactory().createInputStreamParser(ctx.getTaskContext(), 0);
+        this.parser = (JSONDataParser) new JSONDataParserFactory()
+                .createInputStreamParser(new DefaultExternalRuntimeDataContext(ctx.getTaskContext()));
         this.sourceLocation = sourceLocation;
         inputVal = new VoidPointable();
         utf8Val = new UTF8StringPointable();
@@ -111,4 +113,4 @@
             throw HyracksDataException.create(e);
         }
     }
-}
\ No newline at end of file
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
index 394fcb3..edfa83e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
@@ -22,14 +22,13 @@
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IStreamDataParser;
 import org.apache.asterix.external.parser.ADMDataParser;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ADMDataParserFactory extends AbstractRecordStreamParserFactory<char[]> {
 
@@ -38,7 +37,7 @@
             Collections.unmodifiableList(Arrays.asList("adm", "semi-structured"));
 
     @Override
-    public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) {
+    public IRecordDataParser<char[]> createRecordParser(IExternalDataRuntimeContext context) {
         return createParser();
     }
 
@@ -53,8 +52,7 @@
     }
 
     @Override
-    public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
+    public IStreamDataParser createInputStreamParser(IExternalDataRuntimeContext context) {
         return createParser();
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
index 09f9697..8b88a69 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
@@ -22,6 +22,7 @@
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IStreamDataParser;
@@ -29,7 +30,6 @@
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 
@@ -41,17 +41,18 @@
                     ExternalDataConstants.FORMAT_DELIMITED_TEXT, ExternalDataConstants.FORMAT_TSV));
 
     @Override
-    public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
-        return createParser(ctx);
+    public IRecordDataParser<char[]> createRecordParser(IExternalDataRuntimeContext context)
+            throws HyracksDataException {
+        return createParser(context);
     }
 
-    private DelimitedDataParser createParser(IHyracksTaskContext ctx) throws HyracksDataException {
+    private DelimitedDataParser createParser(IExternalDataRuntimeContext context) throws HyracksDataException {
         IValueParserFactory[] valueParserFactories = ExternalDataUtils.getValueParserFactories(recordType);
         char delimiter = ExternalDataUtils.validateGetDelimiter(configuration);
         char quote = ExternalDataUtils.validateGetQuote(configuration, delimiter);
         boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
         String nullString = configuration.get(ExternalDataConstants.KEY_NULL_STR);
-        return new DelimitedDataParser(ctx, valueParserFactories, delimiter, quote, hasHeader, recordType,
+        return new DelimitedDataParser(context, valueParserFactories, delimiter, quote, hasHeader, recordType,
                 ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.STREAM), nullString);
     }
 
@@ -61,9 +62,8 @@
     }
 
     @Override
-    public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
-        return createParser(ctx);
+    public IStreamDataParser createInputStreamParser(IExternalDataRuntimeContext context) throws HyracksDataException {
+        return createParser(context);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
index f9d50d3..6889067 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
@@ -24,6 +24,7 @@
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IStreamDataParser;
 import org.apache.asterix.external.parser.JSONDataParser;
@@ -33,8 +34,6 @@
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.AUnionType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
@@ -58,9 +57,8 @@
     }
 
     @Override
-    public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
-        return createParser();
+    public IStreamDataParser createInputStreamParser(IExternalDataRuntimeContext context) {
+        return createParser(context);
     }
 
     @Override
@@ -74,8 +72,8 @@
     }
 
     @Override
-    public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
-        return createParser();
+    public IRecordDataParser<char[]> createRecordParser(IExternalDataRuntimeContext context) {
+        return createParser(context);
     }
 
     @Override
@@ -83,8 +81,8 @@
         return char[].class;
     }
 
-    private JSONDataParser createParser() throws HyracksDataException {
-        return new JSONDataParser(recordType, jsonFactory);
+    private JSONDataParser createParser(IExternalDataRuntimeContext context) {
+        return new JSONDataParser(recordType, jsonFactory, context);
     }
 
     /*
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/NoOpDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/NoOpDataParserFactory.java
index f4fca17..82835bb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/NoOpDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/NoOpDataParserFactory.java
@@ -23,13 +23,13 @@
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordDataParserFactory;
 import org.apache.asterix.external.parser.NoOpDataParser;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
 
@@ -45,7 +45,7 @@
 
     @Override
     public void setRecordType(ARecordType recordType) throws AsterixException {
-        //it always return open type
+        //it always returns open type
     }
 
     @Override
@@ -59,7 +59,8 @@
     }
 
     @Override
-    public IRecordDataParser<IValueReference> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
+    public IRecordDataParser<IValueReference> createRecordParser(IExternalDataRuntimeContext context)
+            throws HyracksDataException {
         return new NoOpDataParser();
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java
index a1d2298..13c2ec6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java
@@ -23,11 +23,11 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordDataParserFactory;
 import org.apache.asterix.external.parser.RSSParser;
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 import com.rometools.rome.feed.synd.SyndEntry;
 
@@ -48,7 +48,7 @@
     }
 
     @Override
-    public IRecordDataParser<SyndEntry> createRecordParser(IHyracksTaskContext ctx) {
+    public IRecordDataParser<SyndEntry> createRecordParser(IExternalDataRuntimeContext context) {
         RSSParser dataParser = new RSSParser(recordType);
         return dataParser;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
index 3ae9b82..f9093dc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
@@ -25,6 +25,7 @@
 
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordDataParserFactory;
 import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
@@ -36,7 +37,6 @@
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class RecordWithMetadataParserFactory<I, O> implements IRecordDataParserFactory<I> {
@@ -98,9 +98,9 @@
     }
 
     @Override
-    public IRecordDataParser<I> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
-        IRecordDataParser<O> recordParser = recordParserFactory.createRecordParser(ctx);
-        return new RecordWithMetadataParser<I, O>(metaType, recordParser, converterFactory.createConverter(ctx));
+    public IRecordDataParser<I> createRecordParser(IExternalDataRuntimeContext context) throws HyracksDataException {
+        IRecordDataParser<O> recordParser = recordParserFactory.createRecordParser(context);
+        return new RecordWithMetadataParser<I, O>(metaType, recordParser, converterFactory.createConverter(context));
     }
 
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
index e5b6d01..6083e21 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
@@ -23,12 +23,12 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordDataParserFactory;
 import org.apache.asterix.external.parser.TweetParser;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class TweetParserFactory implements IRecordDataParserFactory<char[]> {
 
@@ -48,9 +48,8 @@
     }
 
     @Override
-    public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) {
-        TweetParser dataParser = new TweetParser(recordType);
-        return dataParser;
+    public IRecordDataParser<char[]> createRecordParser(IExternalDataRuntimeContext context) {
+        return new TweetParser(recordType);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 83ec7e7..3817742 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordDataParser;
@@ -44,7 +45,6 @@
 import org.apache.asterix.external.dataflow.FeedWithMetaDataFlowController;
 import org.apache.asterix.external.dataflow.RecordDataFlowController;
 import org.apache.asterix.external.dataflow.StreamDataFlowController;
-import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.IFeedLogManager;
 import org.apache.asterix.om.types.ARecordType;
@@ -62,19 +62,14 @@
             int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory,
             Map<String, String> configuration, boolean isFeed, IFeedLogManager feedLogManager)
             throws HyracksDataException {
-        IExternalFilterValueEmbedder valueEmbedder =
-                dataSourceFactory.createFilterValueEmbedder(ctx.getWarningCollector());
+        IExternalDataRuntimeContext runtimeContext = dataSourceFactory.createExternalDataRuntimeContext(ctx, partition);
         try {
             switch (dataSourceFactory.getDataSourceType()) {
                 case RECORDS:
                     IRecordReaderFactory<?> recordReaderFactory = (IRecordReaderFactory<?>) dataSourceFactory;
-                    IRecordReader<?> recordReader = recordReaderFactory.createRecordReader(ctx, partition);
-                    recordReader.setValueEmbedder(valueEmbedder);
+                    IRecordReader<?> recordReader = recordReaderFactory.createRecordReader(runtimeContext);
                     IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
-                    IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
-                    // TODO(ali): revisit to think about passing data source name via setter or via createRecordParser
-                    dataParser.configure(recordReader.getDataSourceName(), recordReader.getLineNumber());
-                    dataParser.setValueEmbedder(valueEmbedder);
+                    IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(runtimeContext);
                     if (isFeed) {
                         boolean isChangeFeed = ExternalDataUtils.isChangeFeed(configuration);
                         boolean isRecordWithMeta = ExternalDataUtils.isRecordWithMeta(configuration);
@@ -99,13 +94,11 @@
                     }
                 case STREAM:
                     IInputStreamFactory streamFactory = (IInputStreamFactory) dataSourceFactory;
-                    AsterixInputStream stream = streamFactory.createInputStream(ctx, partition);
-                    stream.setValueEmbedder(valueEmbedder);
+                    AsterixInputStream stream = streamFactory.createInputStream(runtimeContext);
                     IStreamDataParserFactory streamParserFactory = (IStreamDataParserFactory) dataParserFactory;
                     // TODO(ali): revisit to think about passing data source name to parser
-                    IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(ctx, partition);
+                    IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(runtimeContext);
                     streamParser.setInputStream(stream);
-                    streamParser.setValueEmbedder(valueEmbedder);
                     if (isFeed) {
                         return new FeedStreamDataFlowController(ctx, feedLogManager, streamParser, stream);
                     } else {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/DefaultExternalRuntimeDataContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/DefaultExternalRuntimeDataContext.java
new file mode 100644
index 0000000..9fe2ff7
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/DefaultExternalRuntimeDataContext.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider.context;
+
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.input.filter.NoOpFilterValueEmbedder;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class DefaultExternalRuntimeDataContext implements IExternalDataRuntimeContext {
+    private final IHyracksTaskContext context;
+    private final int partition;
+
+    public DefaultExternalRuntimeDataContext(IHyracksTaskContext context) {
+        this(context, -1);
+    }
+
+    public DefaultExternalRuntimeDataContext(IHyracksTaskContext context, int partition) {
+        this.context = context;
+        this.partition = partition;
+    }
+
+    @Override
+    public final IHyracksTaskContext getTaskContext() {
+        return context;
+    }
+
+    @Override
+    public final int getPartition() {
+        return partition;
+    }
+
+    @Override
+    public IExternalFilterValueEmbedder getValueEmbedder() {
+        return NoOpFilterValueEmbedder.INSTANCE;
+    }
+
+    @Override
+    public Supplier<String> getDatasourceNameSupplier() {
+        return ExternalDataConstants.EMPTY_STRING;
+    }
+
+    @Override
+    public LongSupplier getLineNumberSupplier() {
+        return ExternalDataConstants.NO_LINES;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/ExternalReaderRuntimeDataContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/ExternalReaderRuntimeDataContext.java
new file mode 100644
index 0000000..587fc3e
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/ExternalReaderRuntimeDataContext.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider.context;
+
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class ExternalReaderRuntimeDataContext extends ExternalStreamRuntimeDataContext {
+    private final IExternalFilterValueEmbedder valueEmbedder;
+    private StreamRecordReader reader;
+
+    public ExternalReaderRuntimeDataContext(IHyracksTaskContext context, int partition,
+            IExternalFilterValueEmbedder valueEmbedder) {
+        super(context, partition, valueEmbedder);
+        this.valueEmbedder = valueEmbedder;
+        reader = null;
+    }
+
+    @Override
+    public Supplier<String> getDatasourceNameSupplier() {
+        if (reader == null) {
+            // Safeguard: cannot invoke this method unless a reader has been set
+            throw new NullPointerException("Reader has not been set");
+        }
+        return reader.getDataSourceName();
+    }
+
+    @Override
+    public LongSupplier getLineNumberSupplier() {
+        if (reader == null) {
+            // Safeguard: cannot invoke this method unless a reader has been set
+            throw new NullPointerException("Reader has not been set");
+        }
+        return reader.getLineNumber();
+    }
+
+    @Override
+    public IExternalFilterValueEmbedder getValueEmbedder() {
+        return valueEmbedder;
+    }
+
+    public void setReader(StreamRecordReader reader) {
+        this.reader = reader;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/ExternalStreamRuntimeDataContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/ExternalStreamRuntimeDataContext.java
new file mode 100644
index 0000000..f60f2c8
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/ExternalStreamRuntimeDataContext.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider.context;
+
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class ExternalStreamRuntimeDataContext extends DefaultExternalRuntimeDataContext {
+    private final IExternalFilterValueEmbedder valueEmbedder;
+
+    public ExternalStreamRuntimeDataContext(IHyracksTaskContext context, int partition,
+            IExternalFilterValueEmbedder valueEmbedder) {
+        super(context, partition);
+        this.valueEmbedder = valueEmbedder;
+    }
+
+    @Override
+    public IExternalFilterValueEmbedder getValueEmbedder() {
+        return valueEmbedder;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/NoOpExternalRuntimeDataContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/NoOpExternalRuntimeDataContext.java
new file mode 100644
index 0000000..726ec23
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/context/NoOpExternalRuntimeDataContext.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.provider.context;
+
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.input.filter.NoOpFilterValueEmbedder;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public final class NoOpExternalRuntimeDataContext implements IExternalDataRuntimeContext {
+    public static final IExternalDataRuntimeContext INSTANCE = new NoOpExternalRuntimeDataContext();
+
+    private NoOpExternalRuntimeDataContext() {
+    }
+
+    @Override
+    public IHyracksTaskContext getTaskContext() {
+        throw new IllegalAccessError("should not be invoked");
+    }
+
+    @Override
+    public int getPartition() {
+        return -1;
+    }
+
+    @Override
+    public IExternalFilterValueEmbedder getValueEmbedder() {
+        return NoOpFilterValueEmbedder.INSTANCE;
+    }
+
+    @Override
+    public Supplier<String> getDatasourceNameSupplier() {
+        return ExternalDataConstants.EMPTY_STRING;
+    }
+
+    @Override
+    public LongSupplier getLineNumberSupplier() {
+        return ExternalDataConstants.NO_LINES;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
index aa269fe..394295d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
@@ -20,8 +20,6 @@
 package org.apache.asterix.external.util;
 
 import static org.apache.asterix.external.util.ExternalDataConstants.COMPUTED_FIELD_PATTERN;
-import static org.apache.asterix.external.util.ExternalDataConstants.DEFINITION_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.KEY_PATH;
 import static org.apache.asterix.external.util.ExternalDataConstants.PREFIX_DEFAULT_DELIMITER;
 
 import java.io.Serializable;
@@ -51,10 +49,10 @@
 import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.hyracks.util.LogRedactionUtil;
 
-public class ExternalDataPrefix implements Serializable {
+public final class ExternalDataPrefix implements Serializable {
     private static final long serialVersionUID = -7612997190679310483L;
     private final String original;
-    private String root;
+    private final String protocolContainerPair;
     private final boolean endsWithSlash;
     private final List<String> segments;
 
@@ -63,6 +61,7 @@
     private final List<Integer> computedFieldSegmentIndexes = new ArrayList<>();
     private final List<ARecordType> paths = new ArrayList<>();
     private final Map<Integer, PrefixSegment> indexToComputedFieldsMap = new HashMap<>();
+    private String root;
 
     public static final String PREFIX_ROOT_FIELD_NAME = "prefix-root";
     public static final Set<ATypeTag> supportedTypes = new HashSet<>();
@@ -74,13 +73,10 @@
     }
 
     public ExternalDataPrefix(Map<String, String> configuration) throws AlgebricksException {
-        this(getDefinitionOrPath(configuration));
-    }
-
-    public ExternalDataPrefix(String prefix) throws AlgebricksException {
+        String prefix = ExternalDataUtils.getDefinitionOrPath(configuration);
         this.original = prefix != null ? prefix : "";
         this.endsWithSlash = this.original.endsWith("/");
-
+        protocolContainerPair = ExternalDataUtils.getProtocolContainerPair(configuration);
         segments = extractPrefixSegments(original);
         extractComputedFields();
         extractRoot();
@@ -291,8 +287,12 @@
         return evaluator.evaluate();
     }
 
+    public String removeProtocolContainerPair(String path) {
+        return path.replace(protocolContainerPair, "");
+    }
+
     public static boolean containsComputedFields(Map<String, String> configuration) {
-        String path = getDefinitionOrPath(configuration);
+        String path = ExternalDataUtils.getDefinitionOrPath(configuration);
         return path != null && path.contains("{");
     }
 
@@ -333,10 +333,6 @@
         }
     }
 
-    private static String getDefinitionOrPath(Map<String, String> configuration) {
-        return configuration.getOrDefault(DEFINITION_FIELD_NAME, configuration.get(KEY_PATH));
-    }
-
     public static class PrefixSegment implements Serializable {
         private static final long serialVersionUID = 8788939199985336347L;
         private String expression;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 599d2d1..6902175 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -18,11 +18,13 @@
  */
 package org.apache.asterix.external.util;
 
+import static org.apache.asterix.external.util.ExternalDataConstants.DEFINITION_FIELD_NAME;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_DELIMITER;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ESCAPE;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_EXCLUDE;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_EXTERNAL_SCAN_BUFFER_SIZE;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_INCLUDE;
+import static org.apache.asterix.external.util.ExternalDataConstants.KEY_PATH;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START;
@@ -70,6 +72,8 @@
 import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
 import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.asterix.external.util.azure.blob_storage.AzureConstants;
+import org.apache.asterix.external.util.google.gcs.GCSConstants;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.AUnionType;
@@ -1002,4 +1006,40 @@
         return !key.endsWith("/") && predicate.test(matchers, key)
                 && externalDataPrefix.evaluate(key, evaluator, warningCollector);
     }
+
+    public static String getDefinitionOrPath(Map<String, String> configuration) {
+        return configuration.getOrDefault(DEFINITION_FIELD_NAME, configuration.get(KEY_PATH));
+    }
+
+    public static String getProtocolContainerPair(Map<String, String> configurations) {
+        String container = configurations.getOrDefault(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME, "");
+        String type = configurations.getOrDefault(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, "");
+        String protocol;
+        switch (type) {
+            case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3:
+                protocol = S3Constants.HADOOP_S3_PROTOCOL;
+                break;
+            case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB:
+                protocol = AzureConstants.HADOOP_AZURE_BLOB_PROTOCOL;
+                break;
+            case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE:
+                protocol = AzureConstants.HADOOP_AZURE_DATALAKE_PROTOCOL;
+                break;
+            case ExternalDataConstants.KEY_ADAPTER_NAME_GCS:
+                protocol = GCSConstants.HADOOP_GCS_PROTOCOL;
+                break;
+            case ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS:
+                String path = getDefinitionOrPath(configurations);
+                String[] nodePathPair = path.trim().split("://");
+                protocol = nodePathPair[0];
+                break;
+            case ExternalDataConstants.KEY_HDFS_URL:
+                protocol = ExternalDataConstants.KEY_HDFS_URL;
+                break;
+            default:
+                return "";
+        }
+
+        return protocol + "://" + container + "/";
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
index 4f36915..bad5db7 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
@@ -25,6 +25,7 @@
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -32,7 +33,6 @@
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 
 public class RecordWithPKTestReaderFactory implements IRecordReaderFactory<RecordWithPK<char[]>> {
@@ -56,8 +56,7 @@
     }
 
     @Override
-    public IRecordReader<? extends RecordWithPK<char[]>> createRecordReader(final IHyracksTaskContext ctx,
-            final int partition) {
+    public IRecordReader<? extends RecordWithPK<char[]>> createRecordReader(IExternalDataRuntimeContext context) {
         return new TestAsterixMembersReader();
     }
 
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
index a21aeb6..7965d20 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
@@ -25,11 +25,11 @@
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 
 import com.couchbase.client.core.message.dcp.DCPRequest;
@@ -81,7 +81,8 @@
     }
 
     @Override
-    public IRecordReader<? extends DCPRequest> createRecordReader(final IHyracksTaskContext ctx, final int partition) {
+    public IRecordReader<? extends DCPRequest> createRecordReader(IExternalDataRuntimeContext context) {
+        int partition = context.getPartition();
         return new KVTestReader(partition, bucket, schedule,
                 (int) Math.ceil((double) numOfRecords / (double) numOfReaders), deleteCycle, upsertCycle,
                 (numOfRecords / numOfReaders) * partition);
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java
index dbcfe0a..68059c1 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java
@@ -23,6 +23,7 @@
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordDataParserFactory;
 import org.apache.asterix.external.input.record.RecordWithPK;
@@ -32,7 +33,6 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 @SuppressWarnings({ "unchecked" })
@@ -66,8 +66,9 @@
 
     @SuppressWarnings("rawtypes")
     @Override
-    public IRecordDataParser<RecordWithPK<T>> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
-        return new TestRecordWithPKParser(recordParserFactory.createRecordParser(ctx));
+    public IRecordDataParser<RecordWithPK<T>> createRecordParser(IExternalDataRuntimeContext context)
+            throws HyracksDataException {
+        return new TestRecordWithPKParser(recordParserFactory.createRecordParser(context));
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
index 888de08..994325e 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
@@ -34,6 +34,7 @@
 import org.apache.asterix.external.input.stream.LocalFSInputStream;
 import org.apache.asterix.external.parser.ADMDataParser;
 import org.apache.asterix.external.parser.RecordWithMetadataParser;
+import org.apache.asterix.external.provider.context.DefaultExternalRuntimeDataContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FileSystemWatcher;
 import org.apache.asterix.formats.nontagged.ADMPrinterFactoryProvider;
@@ -97,7 +98,7 @@
             // create csv with json record reader
             CSVToRecordWithMetadataAndPKConverter recordConverter =
                     new CSVToRecordWithMetadataAndPKConverter(valueIndex, delimiter, metaType, recordType, pkIndicators,
-                            pkIndexes, keyTypes, ctx.getWarningCollector());
+                            pkIndexes, keyTypes, new DefaultExternalRuntimeDataContext(ctx));
             // create the value parser <ADM in this case>
             ADMDataParser valueParser = new ADMDataParser(recordType, false);
             // create parser.
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java
index 22bfb31..bcd0ab1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java
@@ -23,13 +23,13 @@
 import java.util.Map;
 
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.metadata.api.IDatasourceFunction;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 
@@ -59,9 +59,9 @@
     }
 
     @Override
-    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+    public IRecordReader<? extends char[]> createRecordReader(IExternalDataRuntimeContext context)
             throws HyracksDataException {
-        return function.createRecordReader(ctx, partition);
+        return function.createRecordReader(context.getTaskContext(), context.getPartition());
     }
 
     @Override
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java
index 48d2908..d1cd1fa 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalDatasetDetails.java
@@ -29,6 +29,7 @@
 import org.apache.asterix.builders.RecordBuilder;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.TransactionState;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
@@ -71,6 +72,7 @@
     }
 
     public Map<String, String> getProperties() {
+        properties.computeIfAbsent(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, k -> getAdapter());
         return properties;
     }