Feed Adapter and RecordReader Clean

1. Cleaned DatasourceFactoryProvider with service interface
2. Cleaned StreamRecordReaderProvider with service interface
3. Delayed the Inputstream binding to configuration phase.
4. Added one simple test case for StreamRecordReaderProvider.
5. Moved twitter library check into configure method.
6. Refactored parser interfaces and variable names to make them
   consistent with the record reader intreface.

Change-Id: I699657ddd8408fd00bcbd7df57b6610ef3692a1a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1535
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 23d6727..b7eef11 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -179,6 +179,10 @@
     public static final int FEED_METADATA_UTIL_UNEXPECTED_FEED_DATATYPE = 3080;
     public static final int FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED = 3081;
     public static final int FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC = 3082;
+    public static final int PROVIDER_DATASOURCE_FACTORY_DUPLICATE_FORMAT_MAPPING = 3083;
+    public static final int PROVIDER_STREAM_RECORD_READER_DUPLICATE_FORMAT_MAPPING = 3084;
+    public static final int FEED_UNKNOWN_ADAPTER_NAME = 3085;
+    public static final int PROVIDER_STREAM_RECORD_READER_WRONG_CONFIGURATION = 3086;
     public static final int FEED_CONNECT_FEED_APPLIED_INVALID_FUNCTION = 3087;
 
     private ErrorCode() {
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index e1a54cf..c1f5cc7 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -168,4 +168,8 @@
 3080 = Unexpected feed datatype '%1$s'
 3081 = socket is not properly configured
 3082 = "Invalid %1$s %2$s as it is not part of the AsterixDB cluster. Valid choices are %3$s"
+3083 = Duplicate feed adaptor name: %1$s
+3084 = Duplicate record reader format: %1$s
+3085 = Unknown Adapter Name.
+3086 = Cannot find record reader %1$s with specified configuration.
 3087 = Cannot find function %1$s
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index e341fef..ff9fcb0 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -426,10 +426,5 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
-    <dependency>
-      <groupId>commons-lang</groupId>
-      <artifactId>commons-lang</artifactId>
-      <version>2.5</version>
-    </dependency>
   </dependencies>
 </project>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParserFactory.java
index 3dd3903..de104d4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParserFactory.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.api;
 
 import java.io.Serializable;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -59,5 +60,10 @@
      */
     public void setMetaType(ARecordType metaType);
 
-    public String[] getFormats();
+    /**
+     * Get the formats that are handled by this parser.
+     *
+     * @return A list of formats
+     */
+    public List<String> getParserFormats();
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
index c6adbc4..a8c5d31 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
@@ -21,6 +21,8 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
+import java.util.List;
+
 public interface IRecordReaderFactory<T> extends IExternalDataSourceFactory {
 
     public IRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition)
@@ -32,4 +34,7 @@
     public default DataSourceType getDataSourceType() {
         return DataSourceType.RECORDS;
     }
+
+    public List<String> getRecordReaderNames();
+
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index b58feba..cbf784e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -38,7 +39,6 @@
 import org.apache.asterix.external.input.stream.HDFSInputStream;
 import org.apache.asterix.external.provider.ExternalIndexerProvider;
 import org.apache.asterix.external.provider.StreamRecordReaderProvider;
-import org.apache.asterix.external.provider.StreamRecordReaderProvider.Format;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.HDFSUtils;
@@ -76,7 +76,8 @@
     private JobConf conf;
     private InputSplit[] inputSplits;
     private String nodeName;
-    private Format format;
+    private Class recordReaderClazz;
+    private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList("hdfs"));
 
     @Override
     public void configure(IServiceContext serviceCtx, Map<String, String> configuration) throws AsterixException {
@@ -109,7 +110,7 @@
                 this.recordClass = reader.createValue().getClass();
                 reader.close();
             } else {
-                format = StreamRecordReaderProvider.getReaderFormat(configuration);
+                recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
                 this.recordClass = char[].class;
             }
         } catch (IOException e) {
@@ -202,9 +203,9 @@
             throws HyracksDataException {
         try {
             IExternalIndexer indexer = files == null ? null : ExternalIndexerProvider.getIndexer(configuration);
-            if (format != null) {
-                StreamRecordReader streamReader = StreamRecordReaderProvider.createRecordReader(format,
-                        createInputStream(ctx, partition, indexer), configuration);
+            if (recordReaderClazz != null) {
+                StreamRecordReader streamReader = (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
+                streamReader.configure(createInputStream(ctx, partition, indexer), configuration);
                 if (indexer != null) {
                     return new IndexingStreamRecordReader(streamReader, indexer);
                 } else {
@@ -232,4 +233,9 @@
     public boolean isIndexingOp() {
         return ((files != null) && indexingOp);
     }
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
index 03200a9..831dbc3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
@@ -20,6 +20,8 @@
 
 import java.net.MalformedURLException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -42,6 +44,7 @@
     private final List<String> urls = new ArrayList<>();
     private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
     private transient IServiceContext serviceContext;
+    private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList("rss_feed"));
 
     @Override
     public DataSourceType getDataSourceType() {
@@ -75,6 +78,11 @@
     }
 
     @Override
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
+
+    @Override
     public boolean isIndexible() {
         return false;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
index aa0451a..e64c79e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
@@ -19,15 +19,20 @@
 package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 
 public class EmptyLineSeparatedRecordReader extends StreamRecordReader {
 
-    public EmptyLineSeparatedRecordReader(AsterixInputStream inputStream) {
-        super(inputStream);
-    }
+    private static final List<String> recordReaderFormats = Collections
+            .unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_LINE_SEPARATED));
+    private static final String REQUIRED_CONFIGS = "";
+    protected Map<String, String> config;
 
     private boolean prevCharCR;
     private boolean prevCharLF;
@@ -100,6 +105,16 @@
         return true;
     }
 
+    @Override
+    public List<String> getRecordReaderFormats() {
+        return recordReaderFormats;
+    }
+
+    @Override
+    public String getRequiredConfigs() {
+        return REQUIRED_CONFIGS;
+    }
+
     private boolean skipWhiteSpace() throws IOException {
         // start by skipping white spaces
         while (true) {
@@ -118,4 +133,10 @@
             }
         }
     }
+
+    @Override
+    public void configure(AsterixInputStream inputStream, Map<String, String> config) {
+        super.configure(inputStream);
+        this.config = config;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index 59b72e4..0b41d4b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -19,24 +19,33 @@
 package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class LineRecordReader extends StreamRecordReader {
 
-    private final boolean hasHeader;
+    private boolean hasHeader;
     protected boolean prevCharCR;
     protected int newlineLength;
     protected int recordNumber = 0;
     protected boolean nextIsHeader = false;
+    private static final List<String> recordReaderFormats = Collections.unmodifiableList(
+            Arrays.asList(ExternalDataConstants.FORMAT_DELIMITED_TEXT, ExternalDataConstants.FORMAT_CSV));
+    private static final String REQUIRED_CONFIGS = "";
 
-    public LineRecordReader(final boolean hasHeader, final AsterixInputStream stream) throws HyracksDataException {
-        super(stream);
-        this.hasHeader = hasHeader;
+    @Override
+    public void configure(AsterixInputStream inputStream, Map<String, String> config) throws HyracksDataException {
+        super.configure(inputStream);
+        this.hasHeader = ExternalDataUtils.hasHeader(config);
         if (hasHeader) {
-            stream.setNotificationHandler(this);
+            inputStream.setNotificationHandler(this);
         }
     }
 
@@ -48,6 +57,16 @@
     }
 
     @Override
+    public List<String> getRecordReaderFormats() {
+        return recordReaderFormats;
+    }
+
+    @Override
+    public String getRequiredConfigs() {
+        return REQUIRED_CONFIGS;
+    }
+
+    @Override
     public boolean hasNext() throws IOException {
         while (true) {
             if (done) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index 006bf0b..4c4128a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -19,6 +19,10 @@
 package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.external.api.AsterixInputStream;
@@ -27,21 +31,35 @@
 
 public class QuotedLineRecordReader extends LineRecordReader {
 
-    private final char quote;
+    private char quote;
     private boolean prevCharEscape;
     private boolean inQuote;
+    private static final List<String> recordReaderFormats = Collections.unmodifiableList(
+            Arrays.asList(ExternalDataConstants.FORMAT_DELIMITED_TEXT, ExternalDataConstants.FORMAT_CSV));
+    private static final String REQUIRED_CONFIGS = "quote";
 
-    public QuotedLineRecordReader(final boolean hasHeader, final AsterixInputStream stream, final String quoteString)
-            throws HyracksDataException {
-        super(hasHeader, stream);
-        if ((quoteString == null) || (quoteString.length() != 1)) {
-            throw new HyracksDataException(ExceptionUtils.incorrectParameterMessage(
-                    ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
+    @Override
+    public void configure(AsterixInputStream inputStream, Map<String, String> config) throws HyracksDataException {
+        super.configure(inputStream, config);
+        String quoteString = config.get(ExternalDataConstants.KEY_QUOTE);
+        if (quoteString.length() != 1) {
+            throw new HyracksDataException(ExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_QUOTE,
+                    ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
         }
         this.quote = quoteString.charAt(0);
     }
 
     @Override
+    public List<String> getRecordReaderFormats() {
+        return recordReaderFormats;
+    }
+
+    @Override
+    public String getRequiredConfigs() {
+        return REQUIRED_CONFIGS;
+    }
+
+    @Override
     public boolean hasNext() throws IOException {
         while (true) {
             if (done) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 7614e6eb..e1e1f08 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -19,6 +19,10 @@
 package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.ExceptionUtils;
@@ -35,10 +39,16 @@
     private char recordStart;
     private char recordEnd;
     private int recordNumber = 0;
+    private static final List<String> recordReaderFormats = Collections
+            .unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_ADM, ExternalDataConstants.FORMAT_JSON,
+                    ExternalDataConstants.FORMAT_SEMISTRUCTURED));
+    private static final String REQUIRED_CONFIGS = "";
 
-    public SemiStructuredRecordReader(AsterixInputStream stream, String recStartString, String recEndString)
-            throws HyracksDataException {
-        super(stream);
+    @Override
+    public void configure(AsterixInputStream stream, Map<String, String> config) throws HyracksDataException {
+        super.configure(stream);
+        String recStartString = config.get(ExternalDataConstants.KEY_RECORD_START);
+        String recEndString = config.get(ExternalDataConstants.KEY_RECORD_END);
         // set record opening char
         if (recStartString != null) {
             if (recStartString.length() != 1) {
@@ -153,6 +163,16 @@
     }
 
     @Override
+    public List<String> getRecordReaderFormats() {
+        return recordReaderFormats;
+    }
+
+    @Override
+    public String getRequiredConfigs() {
+        return REQUIRED_CONFIGS;
+    }
+
+    @Override
     public boolean stop() {
         try {
             reader.stop();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
index 7dc5bce..5629f48 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
@@ -19,6 +19,8 @@
 package org.apache.asterix.external.input.record.reader.stream;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IRawRecord;
@@ -29,20 +31,18 @@
 import org.apache.asterix.external.input.stream.AsterixInputStreamReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class StreamRecordReader implements IRecordReader<char[]>, IStreamNotificationHandler {
-    protected final AsterixInputStreamReader reader;
+    protected AsterixInputStreamReader reader;
     protected CharArrayRecord record;
     protected char[] inputBuffer;
     protected int bufferLength = 0;
     protected int bufferPosn = 0;
     protected boolean done = false;
     protected FeedLogManager feedLogManager;
-    protected MutableBoolean newFile = new MutableBoolean(false);
 
-    public StreamRecordReader(AsterixInputStream inputStream) {
+    public void configure(AsterixInputStream inputStream) {
         this.reader = new AsterixInputStreamReader(inputStream);
         record = new CharArrayRecord();
         inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
@@ -95,4 +95,11 @@
     public void notifyNewSource() {
         throw new UnsupportedOperationException();
     }
+
+    public abstract List<String> getRecordReaderFormats();
+
+    public abstract String getRequiredConfigs();
+
+    public abstract void configure(AsterixInputStream inputStream, Map<String, String> config)
+            throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index 7d75af8..0591775 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -18,13 +18,24 @@
  */
 package org.apache.asterix.external.input.record.reader.stream;
 
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory;
+import org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory;
+import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory;
 import org.apache.asterix.external.provider.StreamRecordReaderProvider;
-import org.apache.asterix.external.provider.StreamRecordReaderProvider.Format;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -34,13 +45,12 @@
 public class StreamRecordReaderFactory implements IRecordReaderFactory<char[]> {
 
     private static final long serialVersionUID = 1L;
-    protected final IInputStreamFactory streamFactory;
+    protected IInputStreamFactory streamFactory;
     protected Map<String, String> configuration;
-    protected Format format;
-
-    public StreamRecordReaderFactory(IInputStreamFactory inputStreamFactory) {
-        this.streamFactory = inputStreamFactory;
-    }
+    protected Class recordReaderClazz;
+    private static final List<String> recordReaderNames = Collections.unmodifiableList(
+            Arrays.asList(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER, ExternalDataConstants.ALIAS_SOCKET_ADAPTER,
+                    ExternalDataConstants.SOCKET, ExternalDataConstants.STREAM_SOCKET_CLIENT));
 
     @Override
     public DataSourceType getDataSourceType() {
@@ -57,18 +67,45 @@
         return streamFactory.getPartitionConstraint();
     }
 
+    private void configureInputStreamFactory(Map<String, String> config) throws CompilationException {
+        String reader = config.get(ExternalDataConstants.KEY_READER);
+        if (reader.equals(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER)) {
+            streamFactory = new LocalFSInputStreamFactory();
+        } else if (reader.equals(ExternalDataConstants.ALIAS_SOCKET_ADAPTER)
+                || reader.equals(ExternalDataConstants.SOCKET)) {
+            streamFactory = new SocketServerInputStreamFactory();
+        } else if (reader.equals(ExternalDataConstants.STREAM_SOCKET_CLIENT)) {
+            streamFactory = new SocketClientInputStreamFactory();
+        } else {
+            throw new CompilationException(ErrorCode.FEED_UNKNOWN_ADAPTER_NAME);
+        }
+    }
+
     @Override
     public void configure(IServiceContext serviceCtx, Map<String, String> configuration)
             throws HyracksDataException, AlgebricksException {
         this.configuration = configuration;
+        configureInputStreamFactory(configuration);
         streamFactory.configure(serviceCtx, configuration);
-        format = StreamRecordReaderProvider.getReaderFormat(configuration);
+        recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
     }
 
     @Override
     public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
-        return StreamRecordReaderProvider.createRecordReader(format, streamFactory.createInputStream(ctx, partition),
-                configuration);
+        try {
+            StreamRecordReader streamRecordReader = (StreamRecordReader) recordReaderClazz.getConstructor()
+                    .newInstance();
+            streamRecordReader.configure(streamFactory.createInputStream(ctx, partition), configuration);
+            return streamRecordReader;
+        } catch (InstantiationException | IllegalAccessException | InvocationTargetException
+                | NoSuchMethodException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 6ff0b6c..a0b53fd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -18,12 +18,16 @@
  */
 package org.apache.asterix.external.input.record.reader.twitter;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
@@ -51,20 +55,25 @@
     private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
     private transient IServiceContext serviceCtx;
 
-    public static boolean isTwitterPull(Map<String, String> configuration) {
-        String reader = configuration.get(ExternalDataConstants.KEY_READER);
-        if (reader.equals(ExternalDataConstants.READER_TWITTER_PULL)
-                || reader.equals(ExternalDataConstants.READER_PULL_TWITTER)) {
-            return true;
-        }
-        return false;
-    }
+    private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList(
+            ExternalDataConstants.READER_TWITTER_PULL,
+            ExternalDataConstants.READER_TWITTER_PUSH,
+            ExternalDataConstants.READER_PUSH_TWITTER,
+            ExternalDataConstants.READER_PULL_TWITTER,
+            ExternalDataConstants.READER_USER_STREAM_TWITTER));
+
 
     @Override
     public DataSourceType getDataSourceType() {
         return DataSourceType.RECORDS;
     }
 
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
+
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
         clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(
@@ -75,6 +84,12 @@
 
     @Override
     public void configure(IServiceContext serviceCtx, Map<String, String> configuration) throws AsterixException {
+        try {
+            Class.forName("twitter4j.Twitter");
+        } catch (ClassNotFoundException e) {
+            throw new AsterixException(ErrorCode.ADAPTER_TWITTER_TWITTER4J_LIB_NOT_FOUND, e);
+        }
+
         this.configuration = configuration;
         this.serviceCtx = serviceCtx;
         TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
index b0a1db2..489cf77 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
@@ -27,10 +27,15 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
 public class ADMDataParserFactory extends AbstractRecordStreamParserFactory<char[]> {
 
     private static final long serialVersionUID = 1L;
-    private static String[] formats = { "adm", "json", "semi-structured" };
+    private static final List<String> parserFormats = Collections
+            .unmodifiableList(Arrays.asList("adm", "json", "semi-structured"));
 
     @Override
     public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) {
@@ -58,8 +63,7 @@
     }
 
     @Override
-    public String[] getFormats() {
-        return formats;
+    public List<String> getParserFormats() {
+        return parserFormats;
     }
-
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
index 0c9fd34..bba8ccc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.external.parser.factory;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -36,7 +39,8 @@
 public class DelimitedDataParserFactory extends AbstractRecordStreamParserFactory<char[]> {
 
     private static final long serialVersionUID = 1L;
-    private static String[] formats = { "csv", "delimited-text" };
+    private static final List<String> parserFormats = Collections
+            .unmodifiableList(Arrays.asList("csv", "delimited-text"));
 
     @Override
     public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
@@ -102,8 +106,7 @@
     }
 
     @Override
-    public String[] getFormats() {
-        return formats;
+    public List<String> getParserFormats() {
+        return parserFormats;
     }
-
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/HiveDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/HiveDataParserFactory.java
index 8914152..c944554 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/HiveDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/HiveDataParserFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.external.parser.factory;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.CompilationException;
@@ -35,7 +38,8 @@
 public class HiveDataParserFactory implements IRecordDataParserFactory<Writable> {
 
     private static final long serialVersionUID = 1L;
-    private static String[] formats = { "hive", "hive-parser"};
+    private static final List<String> parserFormats = Collections
+            .unmodifiableList(Arrays.asList("hive", "hive-parser"));
     private Map<String, String> configuration;
     private ARecordType recordType;
     private String hiveSerdeClassName;
@@ -73,8 +77,8 @@
     public void setMetaType(ARecordType metaType) {
     }
 
-    @Override public String[] getFormats() {
-        return formats;
+    @Override
+    public List<String> getParserFormats() {
+        return parserFormats;
     }
-
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java
index 25308f2..7f0d006 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RSSParserFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.external.parser.factory;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.external.api.IRecordDataParser;
@@ -31,7 +34,7 @@
 public class RSSParserFactory implements IRecordDataParserFactory<SyndEntryImpl> {
 
     private static final long serialVersionUID = 1L;
-    private static String[] formats = { "rss" };
+    private static final List<String> parserFormats = Collections.unmodifiableList(Arrays.asList("rss"));
     private ARecordType recordType;
 
     @Override
@@ -60,8 +63,8 @@
     }
 
     @Override
-    public String[] getFormats() {
-        return formats;
+    public List<String> getParserFormats() {
+        return parserFormats;
     }
 
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
index 06de407..c7bf447 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.external.parser.factory;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -38,7 +41,8 @@
 public class RecordWithMetadataParserFactory<I, O> implements IRecordDataParserFactory<I> {
 
     private static final long serialVersionUID = 1L;
-    private static String[] formats = { "record-with-metadata" };
+    private static final List<String> parserFormats = Collections
+            .unmodifiableList(Arrays.asList("record-with-metadata"));
     private ARecordType metaType;
     private ARecordType recordType;
     private IRecordDataParserFactory<O> recordParserFactory;
@@ -83,8 +87,8 @@
     }
 
     @Override
-    public String[] getFormats() {
-        return formats;
+    public List<String> getParserFormats() {
+        return parserFormats;
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
index 771f56a..7d2c34b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.external.parser.factory;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.external.api.IRecordDataParser;
@@ -31,7 +34,7 @@
 public class TweetParserFactory implements IRecordDataParserFactory<String> {
 
     private static final long serialVersionUID = 1L;
-    private static String[] formats = { "twitter-status" };
+    private static final List<String> parserFormats = Collections.unmodifiableList(Arrays.asList("twitter-status"));
     private ARecordType recordType;
 
     @Override
@@ -61,8 +64,8 @@
     }
 
     @Override
-    public String[] getFormats() {
-        return formats;
+    public List<String> getParserFormats() {
+        return parserFormats;
     }
 
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index 41fc7e4..859c9fd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -18,8 +18,17 @@
  */
 package org.apache.asterix.external.provider;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.library.ILibraryManager;
@@ -27,26 +36,23 @@
 import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.input.HDFSDataSourceFactory;
-import org.apache.asterix.external.input.record.reader.rss.RSSRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory;
 import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory;
-import org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory;
 import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.commons.io.IOUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import twitter4j.conf.ConfigurationBuilder;
 
 public class DatasourceFactoryProvider {
 
+    private static final String RESOURCE = "META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory";
+    private static Map<String, Class> factories = null;
+
     private DatasourceFactoryProvider() {
     }
 
     public static IExternalDataSourceFactory getExternalDataSourceFactory(ILibraryManager libraryManager,
-            Map<String, String> configuration) throws HyracksDataException {
+            Map<String, String> configuration) throws HyracksDataException, AsterixException {
         if (ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.RECORDS)) {
             String reader = configuration.get(ExternalDataConstants.KEY_READER);
             return DatasourceFactoryProvider.getRecordReaderFactory(libraryManager, reader, configuration);
@@ -89,46 +95,64 @@
         return streamSourceFactory;
     }
 
-    public static IRecordReaderFactory<?> getRecordReaderFactory(ILibraryManager libraryManager, String reader,
-            Map<String, String> configuration) throws HyracksDataException {
-        if (reader.equals(ExternalDataConstants.EXTERNAL)) {
-            try {
-                return ExternalDataUtils.createExternalRecordReaderFactory(libraryManager, configuration);
-            } catch (AlgebricksException e) {
-                // Not sure whether this is the right way to handle AlgebricksException  (xikui)
-                throw new HyracksDataException(e);
+    protected static IRecordReaderFactory getInstance(Class clazz) throws AsterixException {
+        try {
+            return (IRecordReaderFactory) clazz.newInstance();
+        } catch (InstantiationException | IllegalAccessException | ClassCastException e) {
+            throw new AsterixException("Cannot create: " + clazz.getSimpleName(), e);
+        }
+    }
+
+    public static IRecordReaderFactory getRecordReaderFactory(ILibraryManager libraryManager, String adaptorName,
+            Map<String, String> configuration) throws HyracksDataException, AsterixException {
+        if (adaptorName.equals(ExternalDataConstants.EXTERNAL)) {
+            return ExternalDataUtils.createExternalRecordReaderFactory(libraryManager, configuration);
+        }
+
+        if (factories == null) {
+            factories = initFactories();
+        }
+
+        if (factories.containsKey(adaptorName)) {
+            return getInstance(factories.get(adaptorName));
+        }
+
+        try {
+            return (IRecordReaderFactory) Class.forName(adaptorName).newInstance();
+        } catch (IllegalAccessException | ClassNotFoundException | InstantiationException | ClassCastException e) {
+            throw new RuntimeDataException(ErrorCode.UNKNOWN_RECORD_READER_FACTORY, e, adaptorName);
+        }
+    }
+
+    protected static Map<String, Class> initFactories() throws AsterixException {
+        Map<String, Class> factories = new HashMap<>();
+        ClassLoader cl = ParserFactoryProvider.class.getClassLoader();
+        final Charset encoding = Charset.forName("UTF-8");
+        try {
+            Enumeration<URL> urls = cl.getResources(RESOURCE);
+            for (URL url : Collections.list(urls)) {
+                InputStream is = url.openStream();
+                String config = IOUtils.toString(is, encoding);
+                is.close();
+                String[] classNames = config.split("\n");
+                for (String className : classNames) {
+                    if (className.startsWith("#")) {
+                        continue;
+                    }
+                    final Class<?> clazz = Class.forName(className);
+                    List<String> formats = ((IRecordReaderFactory) clazz.newInstance()).getRecordReaderNames();
+                    for (String format : formats) {
+                        if (factories.containsKey(format)) {
+                            throw new AsterixException(ErrorCode.PROVIDER_DATASOURCE_FACTORY_DUPLICATE_FORMAT_MAPPING,
+                                    format);
+                        }
+                        factories.put(format, clazz);
+                    }
+                }
             }
+        } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+            throw new AsterixException(e);
         }
-        switch (reader) {
-            case ExternalDataConstants.READER_HDFS:
-                return new HDFSDataSourceFactory();
-            case ExternalDataConstants.ALIAS_LOCALFS_ADAPTER:
-                return new StreamRecordReaderFactory(new LocalFSInputStreamFactory());
-            case ExternalDataConstants.READER_TWITTER_PULL:
-            case ExternalDataConstants.READER_TWITTER_PUSH:
-            case ExternalDataConstants.READER_PUSH_TWITTER:
-            case ExternalDataConstants.READER_PULL_TWITTER:
-            case ExternalDataConstants.READER_USER_STREAM_TWITTER:
-                try {
-                    Class.forName("twitter4j.Twitter");
-                } catch (ClassNotFoundException e) {
-                    throw new RuntimeDataException(ErrorCode.ADAPTER_TWITTER_TWITTER4J_LIB_NOT_FOUND, e);
-                }
-                return new TwitterRecordReaderFactory();
-            case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
-            case ExternalDataConstants.SOCKET:
-                return new StreamRecordReaderFactory(new SocketServerInputStreamFactory());
-            case ExternalDataConstants.STREAM_SOCKET_CLIENT:
-                return new StreamRecordReaderFactory(new SocketClientInputStreamFactory());
-            case ExternalDataConstants.READER_RSS:
-                return new RSSRecordReaderFactory();
-            default:
-                try {
-                    return (IRecordReaderFactory<?>) Class.forName(reader).newInstance();
-                } catch (IllegalAccessException | ClassNotFoundException | InstantiationException
-                        | ClassCastException e) {
-                    throw new RuntimeDataException(ErrorCode.UNKNOWN_RECORD_READER_FACTORY, e,reader);
-                }
-        }
+        return factories;
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index 0c9b835..c16ee89 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -102,7 +102,7 @@
                         continue;
                     }
                     final Class<?> clazz = Class.forName(className);
-                    String[] formats = ((IDataParserFactory) clazz.newInstance()).getFormats();
+                    List<String> formats = ((IDataParserFactory) clazz.newInstance()).getParserFormats();
                     for (String format : formats) {
                         if (factories.containsKey(format)) {
                             throw new AsterixException("Duplicate format " + format);
@@ -111,8 +111,7 @@
                     }
                 }
             }
-        } catch (IOException | ClassNotFoundException | InstantiationException
-                | IllegalAccessException e) {
+        } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
             throw new AsterixException(e);
         }
         return factories;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
index 261a63c..d6df81d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
@@ -18,66 +18,124 @@
  */
 package org.apache.asterix.external.provider;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparatedRecordReader;
-import org.apache.asterix.external.input.record.reader.stream.LineRecordReader;
-import org.apache.asterix.external.input.record.reader.stream.QuotedLineRecordReader;
-import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader;
 import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
 
 public class StreamRecordReaderProvider {
-    public enum Format {
-        SEMISTRUCTURED,
-        CSV,
-        LINE_SEPARATED
+
+    private static final String RESOURCE = "META-INF/services/org.apache.asterix.external.input.record."
+            + "reader.stream.StreamRecordReader";
+    private static Map<String, List<Pair<String[], Class>>> recordReaders = null;
+
+    protected static StreamRecordReader getInstance(Class clazz) throws AsterixException {
+        try {
+            return (StreamRecordReader) clazz.newInstance();
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new AsterixException("Cannot create RecordReader: " + clazz.getSimpleName(), e);
+        }
     }
 
-    public static Format getReaderFormat(Map<String, String> configuration) throws AsterixException {
-        String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
-        if (format != null) {
-            switch (format) {
-                case ExternalDataConstants.FORMAT_ADM:
-                case ExternalDataConstants.FORMAT_JSON:
-                case ExternalDataConstants.FORMAT_SEMISTRUCTURED:
-                    return Format.SEMISTRUCTURED;
-                case ExternalDataConstants.FORMAT_LINE_SEPARATED:
-                    return Format.LINE_SEPARATED;
-                case ExternalDataConstants.FORMAT_DELIMITED_TEXT:
-                case ExternalDataConstants.FORMAT_CSV:
-                    return Format.CSV;
+    private StreamRecordReaderProvider() {
+        // do nothing
+    }
+
+    public static Class findRecordReaderClazzWithConfig(Map<String, String> configuration, String format)
+            throws AsterixException {
+        List<Pair<String[], Class>> requiredConfigs = recordReaders.get(format);
+        Class clazz = null;
+        int maxOptNum = 0;
+        for (Pair<String[], Class> configPair : requiredConfigs) {
+            Boolean matchFlag = true;
+            int matchOptNum = 0;
+            // Here StreamRecordReaderProvider will try its best to find the most suitable
+            // record reader by matching options. However, if there is a option key error,
+            // it will be ignored and a potential wrong record reader with less required options
+            // will be returned.
+            for (String configKey : configPair.getLeft()) {
+                if (configKey.length() > 0 && !configuration.containsKey(configKey)) {
+                    matchFlag = false;
+                    break;
+                }
+                if (configKey.length() > 0) {
+                    matchOptNum++;
+                }
             }
-            throw new AsterixException("Unknown format: " + format);
+            if (matchFlag && matchOptNum >= maxOptNum) {
+                clazz = configPair.getRight();
+                maxOptNum = matchOptNum;
+            }
+        }
+        if (clazz != null) {
+            return clazz;
+        } else {
+            throw new AsterixException(ErrorCode.PROVIDER_STREAM_RECORD_READER_WRONG_CONFIGURATION, format);
+        }
+    }
+
+    public static Class getRecordReaderClazz(Map<String, String> configuration) throws AsterixException {
+        String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
+
+        if (recordReaders == null) {
+            recordReaders = initRecordReaders();
+        }
+
+        if (format != null) {
+            if (recordReaders.containsKey(format)) {
+                return findRecordReaderClazzWithConfig(configuration, format);
+            }
+            throw new AsterixException(ErrorCode.PROVIDER_STREAM_RECORD_READER_UNKNOWN_FORMAT, format);
         }
         throw new AsterixException("Unspecified parameter: " + ExternalDataConstants.KEY_FORMAT);
     }
 
-    public static StreamRecordReader createRecordReader(Format format, AsterixInputStream inputStream,
-            Map<String, String> configuration) throws HyracksDataException {
-        switch (format) {
-            case CSV:
-                String quoteString = configuration.get(ExternalDataConstants.KEY_QUOTE);
-                boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
-                if (quoteString != null) {
-                    return new QuotedLineRecordReader(hasHeader, inputStream, quoteString);
-                } else {
-                    return new LineRecordReader(hasHeader, inputStream);
+    protected static Map<String, List<Pair<String[], Class>>> initRecordReaders() throws AsterixException {
+        Map<String, List<Pair<String[], Class>>> recordReaders = new HashMap<>();
+        ClassLoader cl = StreamRecordReaderProvider.class.getClassLoader();
+        final Charset encoding = Charset.forName("UTF-8");
+        try {
+            Enumeration<URL> urls = cl.getResources(RESOURCE);
+            for (URL url : Collections.list(urls)) {
+                InputStream is = url.openStream();
+                String config = IOUtils.toString(is, encoding);
+                is.close();
+                String[] classNames = config.split("\n");
+                for (String className : classNames) {
+                    if (className.startsWith("#")) {
+                        continue;
+                    }
+                    final Class<?> clazz = Class.forName(className);
+                    StreamRecordReader newInstance = (StreamRecordReader) clazz.getConstructor().newInstance();
+                    List<String> formats = newInstance.getRecordReaderFormats();
+                    String[] configs = newInstance.getRequiredConfigs().split(":");
+                    for (String format : formats) {
+                        if (!recordReaders.containsKey(format)) {
+                            recordReaders.put(format, new ArrayList<>());
+                        }
+                        recordReaders.get(format).add(Pair.of(configs, clazz));
+                    }
                 }
-            case LINE_SEPARATED:
-                return new EmptyLineSeparatedRecordReader(inputStream);
-            case SEMISTRUCTURED:
-                return new SemiStructuredRecordReader(inputStream,
-                        configuration.get(ExternalDataConstants.KEY_RECORD_START),
-                        configuration.get(ExternalDataConstants.KEY_RECORD_END));
-            default:
-                throw new RuntimeDataException(ErrorCode.PROVIDER_STREAM_RECORD_READER_UNKNOWN_FORMAT, format);
+            }
+        } catch (IOException | ClassNotFoundException | InvocationTargetException | IllegalAccessException
+                | NoSuchMethodException | InstantiationException e) {
+            throw new AsterixException(e);
         }
+        return recordReaders;
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
new file mode 100644
index 0000000..52e28be
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory
+org.apache.asterix.external.input.record.reader.rss.RSSRecordReaderFactory
+org.apache.asterix.external.input.HDFSDataSourceFactory
+org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.input.record.reader.stream.StreamRecordReader b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.input.record.reader.stream.StreamRecordReader
new file mode 100644
index 0000000..504e7c0
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.input.record.reader.stream.StreamRecordReader
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparatedRecordReader
+org.apache.asterix.external.input.record.reader.stream.LineRecordReader
+org.apache.asterix.external.input.record.reader.stream.QuotedLineRecordReader
+org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
index f90b971..74020f3 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
@@ -24,6 +24,7 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -38,6 +39,7 @@
 import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader;
 import org.apache.asterix.external.input.stream.LocalFSInputStream;
 import org.apache.asterix.external.library.ClassAdParser;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FileSystemWatcher;
 import org.apache.asterix.formats.nontagged.ADMPrinterFactoryProvider;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -89,7 +91,8 @@
     public void testSchemaful() {
         try {
             File file = new File("target/classad-wtih-temporals.adm");
-            File expected = new File(getClass().getResource("/classad/results/classad-with-temporals.adm").toURI().getPath());
+            File expected = new File(
+                    getClass().getResource("/classad/results/classad-with-temporals.adm").toURI().getPath());
             FileUtils.deleteQuietly(file);
             PrintStream printStream = new PrintStream(Files.newOutputStream(Paths.get(file.toURI())));
             String[] recordFieldNames = { "GlobalJobId", "Owner", "ClusterId", "ProcId", "RemoteWallClockTime",
@@ -109,15 +112,19 @@
                 printers[i] = printerFactories[i].createPrinter();
             }
             ClassAdObjectPool objectPool = new ClassAdObjectPool();
-            String[] files = new String[] {"/classad/classad-with-temporals.classads"};
+            String[] files = new String[] { "/classad/classad-with-temporals.classads" };
             ClassAdParser parser = new ClassAdParser(recordType, false, false, false, null, null, null, objectPool);
             ArrayTupleBuilder tb = new ArrayTupleBuilder(numOfTupleFields);
             for (String path : files) {
                 List<Path> paths = new ArrayList<>();
+                Map<String, String> config = new HashMap<>();
+                config.put(ExternalDataConstants.KEY_RECORD_START, "[");
+                config.put(ExternalDataConstants.KEY_RECORD_END, "]");
                 paths.add(Paths.get(getClass().getResource(path).toURI()));
                 FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
                 LocalFSInputStream in = new LocalFSInputStream(watcher);
-                SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, "[", "]");
+                SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader();
+                recordReader.configure(in, config);
                 while (recordReader.hasNext()) {
                     tb.reset();
                     IRawRecord<char[]> record = recordReader.next();
@@ -144,15 +151,19 @@
         try {
             ClassAdObjectPool objectPool = new ClassAdObjectPool();
             ClassAd pAd = new ClassAd(objectPool);
-            String[] files = new String[] {"/classad/escapes.txt"};
+            String[] files = new String[] { "/classad/escapes.txt" };
             ClassAdParser parser = new ClassAdParser(objectPool);
             CharArrayLexerSource lexerSource = new CharArrayLexerSource();
             for (String path : files) {
                 List<Path> paths = new ArrayList<>();
+                Map<String, String> config = new HashMap<>();
+                config.put(ExternalDataConstants.KEY_RECORD_START, "[");
+                config.put(ExternalDataConstants.KEY_RECORD_END, "]");
                 paths.add(Paths.get(getClass().getResource(path).toURI()));
                 FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
                 LocalFSInputStream in = new LocalFSInputStream(watcher);
-                SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, "[", "]");
+                SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader();
+                recordReader.configure(in, config);
                 try {
                     Value val = new Value(objectPool);
                     while (recordReader.hasNext()) {
@@ -182,15 +193,19 @@
         try {
             ClassAdObjectPool objectPool = new ClassAdObjectPool();
             ClassAd pAd = new ClassAd(objectPool);
-            String[] files = new String[] {"/classad/jobads.txt"};
+            String[] files = new String[] { "/classad/jobads.txt" };
             ClassAdParser parser = new ClassAdParser(objectPool);
             CharArrayLexerSource lexerSource = new CharArrayLexerSource();
             for (String path : files) {
                 List<Path> paths = new ArrayList<>();
+                Map<String, String> config = new HashMap<>();
+                config.put(ExternalDataConstants.KEY_RECORD_START, "[");
+                config.put(ExternalDataConstants.KEY_RECORD_END, "]");
                 paths.add(Paths.get(getClass().getResource(path).toURI()));
                 FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
                 LocalFSInputStream in = new LocalFSInputStream(watcher);
-                SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, "[", "]");
+                SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader();
+                recordReader.configure(in, config);
                 try {
                     Value val = new Value(objectPool);
                     while (recordReader.hasNext()) {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
index 12652f3..c9e37ee 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.external.input.record.reader;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.common.api.IApplicationContext;
@@ -35,6 +38,8 @@
     private static final long serialVersionUID = 1L;
     private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
     private transient IServiceContext serviceCtx;
+    private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList());
+
 
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
@@ -58,4 +63,8 @@
     public Class<?> getRecordClass() {
         return RecordWithPK.class;
     }
+
+    @Override public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
index d2e0281..c45941d 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.external.input.record.reader.kv;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.external.api.IRecordReader;
@@ -40,6 +43,7 @@
     private int upsertCycle = 0;
     private int numOfReaders;
     private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+    private static final List<String> recordReaderNames = Collections.unmodifiableList(Arrays.asList());
 
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
@@ -78,4 +82,8 @@
     public Class<?> getRecordClass() {
         return DCPRequest.class;
     }
+
+    @Override public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParserFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParserFactory.java
index d8cc3bb..ea4024b 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParserFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParserFactory.java
@@ -21,6 +21,9 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.external.api.IRecordDataParser;
@@ -34,7 +37,7 @@
 public class ClassAdParserFactory implements IRecordDataParserFactory<char[]> {
 
     private static final long serialVersionUID = 1L;
-    private static final String[] formats = { "line-separated" };
+    private static final List<String> parserFormats = Collections.unmodifiableList(Arrays.asList("line-separated"));
     public static final String KEY_OLD_FORMAT = "old-format";
     public static final String KEY_EVALUATE = "evaluate";
     public static final String KEY_KEEP_EXPR = "keep-expr";
@@ -123,8 +126,8 @@
     }
 
     @Override
-    public String[] getFormats() {
-        return formats;
+    public List<String> getParserFormats() {
+        return parserFormats;
     }
 
 }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java
index 91919d1..7284979 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.parser.factory;
 
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -38,7 +39,7 @@
 public class TestRecordWithPKParserFactory<T> implements IRecordDataParserFactory<RecordWithPK<T>> {
 
     private static final long serialVersionUID = 1L;
-    private static final ArrayList<String> formats = new ArrayList<>();
+    private static final ArrayList<String> parserFormats = new ArrayList<>();
     private ARecordType recordType;
     private IRecordDataParserFactory<char[]> recordParserFactory;
     private String format;
@@ -51,7 +52,7 @@
     public void configure(Map<String, String> configuration) throws AsterixException {
         TreeMap<String, String> parserConf = new TreeMap<String, String>();
         format = configuration.get(ExternalDataConstants.KEY_RECORD_FORMAT);
-        formats.add(format);
+        parserFormats.add(format);
         parserConf.put(ExternalDataConstants.KEY_FORMAT, format);
         recordParserFactory =
                 (IRecordDataParserFactory<char[]>) ParserFactoryProvider.getDataParserFactory(null, parserConf);
@@ -80,8 +81,8 @@
     }
 
     @Override
-    public String[] getFormats() {
-        return (String[]) formats.toArray();
+    public List<String> getParserFormats() {
+        return parserFormats;
     }
 
 }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
index c238f1c..2ba5a3e 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
@@ -30,13 +30,16 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.input.record.CharArrayRecord;
 import org.apache.asterix.external.input.record.converter.DCPMessageToRecordConverter;
 import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader;
 import org.apache.asterix.external.input.stream.LocalFSInputStream;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FileSystemWatcher;
 import org.junit.Assert;
 import org.junit.Test;
@@ -73,10 +76,14 @@
     public void testDecodingJsonRecords() throws URISyntaxException, IOException {
         String jsonFileName = "/record.json";
         List<Path> paths = new ArrayList<>();
+        Map<String, String> config = new HashMap<>();
+        config.put(ExternalDataConstants.KEY_RECORD_START, "{");
+        config.put(ExternalDataConstants.KEY_RECORD_END, "}");
         paths.add(Paths.get(getClass().getResource(jsonFileName).toURI()));
         FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
         LocalFSInputStream in = new LocalFSInputStream(watcher);
-        try (SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, "{", "}")) {
+        try (SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader()) {
+            recordReader.configure(in, config);
             while (recordReader.hasNext()) {
                 try {
                     IRawRecord<char[]> record = recordReader.next();
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
index a782ad0..1584065 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
@@ -24,11 +24,13 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.input.record.converter.CSVToRecordWithMetadataAndPKConverter;
-import org.apache.asterix.external.input.record.reader.stream.QuotedLineRecordReader;
+import org.apache.asterix.external.input.record.reader.stream.LineRecordReader;
 import org.apache.asterix.external.input.stream.LocalFSInputStream;
 import org.apache.asterix.external.parser.ADMDataParser;
 import org.apache.asterix.external.parser.RecordWithMetadataParser;
@@ -84,8 +86,11 @@
             // create input stream
             LocalFSInputStream inputStream = new LocalFSInputStream(watcher);
             // create reader record reader
-            QuotedLineRecordReader lineReader = new QuotedLineRecordReader(true, inputStream,
-                    ExternalDataConstants.DEFAULT_QUOTE);
+            Map<String, String> config = new HashMap<>();
+            config.put(ExternalDataConstants.KEY_HEADER, "true");
+            config.put(ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.DEFAULT_QUOTE);
+            LineRecordReader lineReader = new LineRecordReader();
+            lineReader.configure(inputStream, config);
             // create csv with json record reader
             CSVToRecordWithMetadataAndPKConverter recordConverter = new CSVToRecordWithMetadataAndPKConverter(
                     valueIndex, delimiter, metaType, recordType, pkIndicators, pkIndexes, keyTypes);
@@ -105,7 +110,7 @@
             serdes[1] = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
             printerFactories[0] = ADMPrinterFactoryProvider.INSTANCE.getPrinterFactory(recordType);
             printerFactories[1] = ADMPrinterFactoryProvider.INSTANCE.getPrinterFactory(metaType);
-            // create output descriptor 
+            // create output descriptor
             IPrinter[] printers = new IPrinter[printerFactories.length];
 
             for (int i = 0; i < printerFactories.length; i++) {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/StreamRecordReaderProviderTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/StreamRecordReaderProviderTest.java
new file mode 100644
index 0000000..71a5072
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/StreamRecordReaderProviderTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.test;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class StreamRecordReaderProviderTest {
+
+    @Test
+    public void Test() throws AsterixException{
+        List<String> recordReaderFormats = Arrays.asList(
+                ExternalDataConstants.FORMAT_LINE_SEPARATED,
+                ExternalDataConstants.FORMAT_ADM,
+                ExternalDataConstants.FORMAT_JSON,
+                ExternalDataConstants.FORMAT_SEMISTRUCTURED,
+                ExternalDataConstants.FORMAT_DELIMITED_TEXT,
+                ExternalDataConstants.FORMAT_CSV);
+        Map<String, String> config = new HashMap<>();
+        for (String format : recordReaderFormats) {
+            config.clear();
+            config.put(ExternalDataConstants.KEY_FORMAT, format);
+            Class clazz = StreamRecordReaderProvider.getRecordReaderClazz(config);
+            Assert.assertTrue(clazz != null);
+        }
+        config.clear();
+        config.put(ExternalDataConstants.KEY_FORMAT, ExternalDataConstants.FORMAT_CSV);
+        config.put(ExternalDataConstants.KEY_QUOTE, "\u0000");
+        Assert.assertTrue(StreamRecordReaderProvider.getRecordReaderClazz(config) != null);
+    }
+}