[ASTERIXDB-2713][EXT] CSV & TSV support for external dataset p2

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
 - validate adapter configuration
 - ignore empty lines in CSV/TSV files
 - require "header" parameter for CSV/TSV formats
 - make some parameters case-insensitive
 - few fixes and clean-ups

Change-Id: I2f523de0d482a358ada0c27236ff24616ad0d7da
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5848
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/data/csv/empty.csv b/asterixdb/asterix-app/data/csv/empty.csv
new file mode 100644
index 0000000..3f2ff2d
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/empty.csv
@@ -0,0 +1,5 @@
+
+
+
+
+
diff --git a/asterixdb/asterix-app/data/csv/sample_12.csv b/asterixdb/asterix-app/data/csv/sample_12.csv
index 2ab7c6d..0c9baf5 100644
--- a/asterixdb/asterix-app/data/csv/sample_12.csv
+++ b/asterixdb/asterix-app/data/csv/sample_12.csv
@@ -1,3 +1,4 @@
+f1,f2,f3
 1,true,"text"
 2,false,"text"
 3,true,"text"
diff --git a/asterixdb/asterix-app/data/csv/sample_13.csv b/asterixdb/asterix-app/data/csv/sample_13.csv
new file mode 100644
index 0000000..9f53f56
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/sample_13.csv
@@ -0,0 +1,11 @@
+
+
+f1,f2,f3,f4
+
+1,,"good","recommend"
+
+2,,"bad","not recommend"
+3,,"good",
+
+
+
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 9d427ec..3906bd5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -723,7 +723,9 @@
                 case EXTERNAL:
                     ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl();
                     Map<String, String> properties = createExternalDatasetProperties(dd, metadataProvider, mdTxnCtx);
-                    ExternalDataUtils.defaultConfiguration(properties);
+                    ExternalDataUtils.normalize(properties);
+                    ExternalDataUtils.validate(properties);
+                    validateExternalDatasetDetails(externalDetails, properties);
                     datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
                             TransactionState.COMMIT);
                     break;
@@ -1970,9 +1972,12 @@
         MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName,
                 dataverseName + "." + datasetName);
         try {
+            Map<String, String> properties = loadStmt.getProperties();
+            ExternalDataUtils.normalize(properties);
+            ExternalDataUtils.validate(properties);
             CompiledLoadFromFileStatement cls =
                     new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName().getValue(),
-                            loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
+                            loadStmt.getAdapter(), properties, loadStmt.dataIsAlreadySorted());
             cls.setSourceLocation(stmt.getSourceLocation());
             JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls,
                     null, responsePrinter, warningCollector);
@@ -2170,7 +2175,10 @@
                             "A feed with this name " + feedName + " already exists.");
                 }
             }
-            feed = new Feed(dataverseName, feedName, cfs.getConfiguration());
+            Map<String, String> configuration = cfs.getConfiguration();
+            ExternalDataUtils.normalize(configuration);
+            ExternalDataUtils.validate(configuration);
+            feed = new Feed(dataverseName, feedName, configuration);
             FeedMetadataUtil.validateFeed(feed, mdTxnCtx, appCtx);
             MetadataManager.INSTANCE.addFeed(metadataProvider.getMetadataTxnContext(), feed);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -3177,4 +3185,14 @@
             throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, clientRequest.getId());
         }
     }
+
+    protected void validateExternalDatasetDetails(ExternalDetailsDecl externalDetails, Map<String, String> properties)
+            throws RuntimeDataException {
+        String adapter = externalDetails.getAdapter();
+        // "format" parameter is needed for "S3" data source
+        if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(adapter)
+                && properties.get(ExternalDataConstants.KEY_FORMAT) == null) {
+            throw new RuntimeDataException(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_FORMAT);
+        }
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp
index 5728e78..f7fe18c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp
@@ -25,8 +25,11 @@
 CREATE TYPE t1 AS {f1: string, f2: string, f3: string, f4: string, f5: string};
 CREATE TYPE t2 AS {f1: string, f2: string, f3: string};
 CREATE TYPE t3 AS {f1: int?, f2: boolean, f3: string?};
+CREATE TYPE t4 AS {f1: string, f2: string, f3: string, f4: string};
 
-CREATE EXTERNAL DATASET ds1(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_09.csv"), ("format"="csv"));
-CREATE EXTERNAL DATASET ds2(t2) USING localfs(("path"="asterix_nc1://data/csv/sample_10.csv"), ("format"="csv"));
-CREATE EXTERNAL DATASET ds3(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_11.csv"), ("format"="csv"));
-CREATE EXTERNAL DATASET ds4(t3) USING localfs(("path"="asterix_nc1://data/csv/sample_12.csv"), ("format"="csv"));
\ No newline at end of file
+CREATE EXTERNAL DATASET ds1(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_09.csv"), ("format"="CSV"), ("header"="FALSE"));
+CREATE EXTERNAL DATASET ds2(t2) USING localfs(("path"="asterix_nc1://data/csv/sample_10.csv"), ("format"="Csv"), ("header"="False"));
+CREATE EXTERNAL DATASET ds3(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_11.csv"), ("format"="csv"), ("header"="FALSE"));
+CREATE EXTERNAL DATASET ds4(t3) USING localfs(("path"="asterix_nc1://data/csv/sample_12.csv"), ("format"="csv"), ("header"="True"));
+CREATE EXTERNAL DATASET ds5(t4) USING localfs(("path"="asterix_nc1://data/csv/sample_13.csv"), ("format"="csv"), ("header"="True"));
+CREATE EXTERNAL DATASET ds6(t4) USING localfs(("path"="asterix_nc1://data/csv/empty.csv"), ("format"="csv"), ("header"="false"));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.query.sqlpp
new file mode 100644
index 0000000..a3d113d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds5 v SELECT VALUE v ORDER BY v.f1;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.7.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.7.query.sqlpp
new file mode 100644
index 0000000..2e5b312
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.7.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds6 v SELECT VALUE v ORDER BY v.f1;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.8.ddl.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.ddl.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.8.ddl.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp
index c0faf16..cabe54b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp
@@ -24,4 +24,4 @@
 
 CREATE TYPE t1 AS {f1: int, f2: int, f3: string, f4: boolean, f5: bigint, f6: double};
 
-CREATE EXTERNAL DATASET ds1(t1) USING localfs(("path"="asterix_nc1://data/tsv/sample_01.tsv"), ("format"="tsv"))
\ No newline at end of file
+CREATE EXTERNAL DATASET ds1(t1) USING localfs(("path"="asterix_nc1://data/tsv/sample_01.tsv"), ("format"="tsv"), ("header"="FALSE"));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp
index b906039..6184b19 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp
@@ -32,5 +32,6 @@
 ("serviceEndpoint"="http://localhost:8001"),
 ("container"="playground"),
 ("definition"="csv-data/reviews"),
-("format"="csv")
+("format"="Csv"),
+("header"="false")
 );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp
index d385bee..194adf6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp
@@ -32,5 +32,6 @@
 ("serviceEndpoint"="http://localhost:8001"),
 ("container"="playground"),
 ("definition"="tsv-data/reviews"),
-("format"="tsv")
+("format"="TSV"),
+("header"="False")
 );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/negative/negative.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/negative/negative.000.ddl.sqlpp
new file mode 100644
index 0000000..e0fc056
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/negative/negative.000.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// "format" parameter is missing for S3
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+DROP TYPE test IF EXISTS;
+CREATE TYPE test AS {id: int, year: int?, review: string, details: string?};
+
+DROP DATASET test IF EXISTS;
+CREATE EXTERNAL DATASET test(test) USING S3 (
+("accessKey"="dummyAccessKey"),
+("secretKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="tsv-data/reviews")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.6.adm
new file mode 100644
index 0000000..9a1d1c9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.6.adm
@@ -0,0 +1,3 @@
+{ "f1": "1", "f2": "", "f3": "good", "f4": "recommend" }
+{ "f1": "2", "f2": "", "f3": "bad", "f4": "not recommend" }
+{ "f1": "3", "f2": "", "f3": "good", "f4": "" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.7.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.7.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
index 9948209..2456f13 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
@@ -17,7 +17,7 @@
  ! specific language governing permissions and limitations
  ! under the License.
  !-->
-<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp">
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
   <test-group name="external-dataset">
     <test-case FilePath="external-dataset">
       <compilation-unit name="aws/s3/000">
@@ -34,5 +34,11 @@
         <output-dir compare="Text">aws/s3/002</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="aws/s3/negative">
+        <output-dir compare="Text">aws/s3/negative</output-dir>
+        <expected-error>Parameter(s) format must be specified</expected-error>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index da1c4e7..14f7c83 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -241,7 +241,7 @@
     public static final int LIBRARY_EXTERNAL_FUNCTION_UNSUPPORTED_NAME = 3047;
     public static final int OPERATORS_FEED_META_OPERATOR_DESCRIPTOR_INVALID_RUNTIME = 3048;
     public static final int PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_DELIMITER = 3049;
-    public static final int PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_QUOTE = 3050;
+    public static final int PARSER_INVALID_CHAR_LENGTH = 3050;
     public static final int PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_QUOTE_DELIMITER_MISMATCH = 3051;
     public static final int INDEXING_EXTERNAL_FILE_INDEX_ACCESSOR_UNABLE_TO_FIND_FILE_INDEX = 3052;
     public static final int PARSER_ADM_DATA_PARSER_FIELD_NOT_NULL = 3053;
@@ -307,6 +307,8 @@
     public static final int FAILED_TO_PARSE_METADATA = 3115;
     public static final int INPUT_DECODE_FAILURE = 3116;
     public static final int FAILED_TO_PARSE_MALFORMED_LOG_RECORD = 3117;
+    public static final int PARAMETERS_REQUIRED = 3118;
+    public static final int MALFORMED_RECORD = 3119;
 
     // Lifecycle management errors
     public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 457206d..98ae1a7 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -239,7 +239,7 @@
 3047 = External %1$s not supported
 3048 = Invalid feed runtime: %1$s
 3049 = '%1$s' is not a valid delimiter. The length of a delimiter should be 1
-3050 = '%1$s' is not a valid quote. The length of a quote should be 1
+3050 = '%1$s' is not a valid %2$s. The length of %2$s should be 1
 3051 = Quote '%1$s' cannot be used with the delimiter '%2$s'
 3052 = Was not able to find a file in the files index
 3053 = Field %1$s can not be null
@@ -305,6 +305,8 @@
 3115 = Failed to parse record metadata
 3116 = Failed to decode input
 3117 = Failed to parse record, malformed log record
+3118 = Parameter(s) %1$s must be specified
+3119 = Record number %1$s is malformed
 
 # Lifecycle management errors
 4000 = Partition id %1$s for node %2$s already in use by node %3$s
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index f830376..7702dde 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -205,7 +205,7 @@
             IExternalIndexer indexer = files == null ? null : ExternalIndexerProvider.getIndexer(configuration);
             if (recordReaderClazz != null) {
                 StreamRecordReader streamReader = (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
-                streamReader.configure(createInputStream(ctx, partition, indexer), configuration);
+                streamReader.configure(ctx, createInputStream(ctx, partition, indexer), configuration);
                 if (indexer != null) {
                     return new IndexingStreamRecordReader(streamReader, indexer);
                 } else {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
index 65ecd8d..aa4abb4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
@@ -134,4 +134,10 @@
         strValue.getChars(0, strValue.length(), value, 0);
         this.size = strValue.length();
     }
+
+    public boolean isEmptyRecord() {
+        return size <= 0
+                || (size == 1 && (value[0] == ExternalDataConstants.LF || value[0] == ExternalDataConstants.CR))
+                || (size == 2 && value[0] == ExternalDataConstants.CR && value[1] == ExternalDataConstants.LF);
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
index e78783a..6484d4e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
@@ -18,20 +18,16 @@
  */
 package org.apache.asterix.external.input.record.reader.aws;
 
-import java.lang.reflect.InvocationTargetException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
 import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
 import org.apache.asterix.external.provider.StreamRecordReaderProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class AwsS3ReaderFactory extends StreamRecordReaderFactory {
@@ -73,18 +69,4 @@
         // record reader
         recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
     }
-
-    @Override
-    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
-        try {
-            StreamRecordReader streamRecordReader =
-                    (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
-            streamRecordReader.configure(streamFactory.createInputStream(ctx, partition), configuration);
-            return streamRecordReader;
-        } catch (InstantiationException | IllegalAccessException | InvocationTargetException
-                | NoSuchMethodException e) {
-            throw HyracksDataException.create(e);
-        }
-    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
index 07b6250..24a68a7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
@@ -26,6 +26,7 @@
 
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class EmptyLineSeparatedRecordReader extends StreamRecordReader {
 
@@ -135,7 +136,7 @@
     }
 
     @Override
-    public void configure(AsterixInputStream inputStream, Map<String, String> config) {
+    public void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config) {
         super.configure(inputStream);
         this.config = config;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index be600ed..a27397e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class LineRecordReader extends StreamRecordReader {
@@ -42,10 +43,12 @@
     private static final String REQUIRED_CONFIGS = "";
 
     @Override
-    public void configure(AsterixInputStream inputStream, Map<String, String> config) throws HyracksDataException {
+    public void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config)
+            throws HyracksDataException {
         super.configure(inputStream);
         this.hasHeader = ExternalDataUtils.hasHeader(config);
         if (hasHeader) {
+            // TODO(ali): revisit this and notifyNewSource
             inputStream.setNotificationHandler(this);
         }
     }
@@ -100,13 +103,16 @@
                     startPosn = bufferPosn = 0;
                     bufferLength = reader.read(inputBuffer);
                     if (bufferLength <= 0) {
-                        if (readLength > 0) {
-                            record.endRecord();
-                            recordNumber++;
-                            return true;
+                        if (readLength <= 0) {
+                            close();
+                            return false; //EOF
                         }
-                        close();
-                        return false; //EOF
+                        record.endRecord();
+                        if (record.isEmptyRecord()) {
+                            return false;
+                        }
+                        recordNumber++;
+                        return true;
                     }
                 }
                 for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
@@ -130,6 +136,9 @@
                     record.append(inputBuffer, startPosn, readLength);
                 }
             } while (newlineLength == 0);
+            if (record.isEmptyRecord()) {
+                continue;
+            }
             if (nextIsHeader) {
                 nextIsHeader = false;
                 continue;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index 1fd328b..564df4b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -24,39 +24,35 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.WarningUtil;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class QuotedLineRecordReader extends LineRecordReader {
 
     private char quote;
     private char quoteEscape;
+    private IWarningCollector warningCollector;
+    private final SourceLocation srcLoc = new SourceLocation(-1, -1);
     private static final List<String> recordReaderFormats = Collections.unmodifiableList(
             Arrays.asList(ExternalDataConstants.FORMAT_DELIMITED_TEXT, ExternalDataConstants.FORMAT_CSV));
     private static final String REQUIRED_CONFIGS = ExternalDataConstants.KEY_QUOTE;
 
     @Override
-    public void configure(AsterixInputStream inputStream, Map<String, String> config) throws HyracksDataException {
-        super.configure(inputStream, config);
+    public void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config)
+            throws HyracksDataException {
+        super.configure(ctx, inputStream, config);
+        this.warningCollector = ctx.getWarningCollector();
         String quoteString = config.get(ExternalDataConstants.KEY_QUOTE);
-        if (quoteString.length() != 1) {
-            throw new HyracksDataException(ExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_QUOTE,
-                    ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
-        }
+        ExternalDataUtils.validateQuote(quoteString);
         this.quote = quoteString.charAt(0);
-        String escapeString = config.get(ExternalDataConstants.KEY_QUOTE_ESCAPE);
-        if (escapeString == null) {
-            quoteEscape = ExternalDataConstants.ESCAPE;
-        } else {
-            if (escapeString.length() != 1) {
-                throw new HyracksDataException(
-                        ExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_QUOTE_ESCAPE,
-                                ExternalDataConstants.PARAMETER_OF_SIZE_ONE, escapeString));
-            }
-            quoteEscape = escapeString.charAt(0);
-        }
+        this.quoteEscape = ExternalDataUtils.validateGetQuoteEscape(config);
     }
 
     @Override
@@ -87,16 +83,22 @@
                     startPosn = bufferPosn = 0;
                     bufferLength = reader.read(inputBuffer);
                     if (bufferLength <= 0) {
-                        if (readLength > 0) {
-                            if (inQuote) {
-                                throw new IOException("malformed input record ended inside quote");
+                        // reached end of stream
+                        if (readLength <= 0 || inQuote) {
+                            // haven't read anything previously OR have read and in the middle and hit the end
+                            if (inQuote && warningCollector.shouldWarn()) {
+                                warningCollector
+                                        .warn(WarningUtil.forAsterix(srcLoc, ErrorCode.MALFORMED_RECORD, recordNumber));
                             }
-                            record.endRecord();
-                            recordNumber++;
-                            return true;
+                            close();
+                            return false;
                         }
-                        close();
-                        return false;
+                        record.endRecord();
+                        if (record.isEmptyRecord()) {
+                            return false;
+                        }
+                        recordNumber++;
+                        return true;
                     }
                 }
                 boolean maybeInQuote = false;
@@ -121,12 +123,9 @@
                             // this is an opening quote
                             inQuote = true;
                         }
-                        if (prevCharEscape) {
-                            prevCharEscape = false;
-                        } else {
-                            // the quoteEscape != quote is for making an opening quote not an escape
-                            prevCharEscape = inputBuffer[bufferPosn] == quoteEscape && quoteEscape != quote;
-                        }
+                        // the quoteEscape != quote is for making an opening quote not an escape
+                        prevCharEscape =
+                                inputBuffer[bufferPosn] == quoteEscape && !prevCharEscape && quoteEscape != quote;
                     } else {
                         // if quote == quoteEscape and current char is quote, then it could be closing or escaping
                         if (inputBuffer[bufferPosn] == quote && !prevCharEscape) {
@@ -146,6 +145,9 @@
                     record.append(inputBuffer, startPosn, readLength);
                 }
             } while (newlineLength == 0);
+            if (record.isEmptyRecord()) {
+                continue;
+            }
             if (nextIsHeader) {
                 nextIsHeader = false;
                 continue;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 883f0ee..5ab5730 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -29,6 +29,7 @@
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class SemiStructuredRecordReader extends StreamRecordReader {
@@ -45,7 +46,8 @@
     private static final String REQUIRED_CONFIGS = "";
 
     @Override
-    public void configure(AsterixInputStream stream, Map<String, String> config) throws HyracksDataException {
+    public void configure(IHyracksTaskContext ctx, AsterixInputStream stream, Map<String, String> config)
+            throws HyracksDataException {
         super.configure(stream);
         String recStartString = config.get(ExternalDataConstants.KEY_RECORD_START);
         String recEndString = config.get(ExternalDataConstants.KEY_RECORD_END);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
index 5629f48..4aed741 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
@@ -31,6 +31,7 @@
 import org.apache.asterix.external.input.stream.AsterixInputStreamReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class StreamRecordReader implements IRecordReader<char[]>, IStreamNotificationHandler {
@@ -100,6 +101,6 @@
 
     public abstract String getRequiredConfigs();
 
-    public abstract void configure(AsterixInputStream inputStream, Map<String, String> config)
+    public abstract void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config)
             throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index 076842e..9bb50f6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -95,7 +95,7 @@
         try {
             StreamRecordReader streamRecordReader =
                     (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
-            streamRecordReader.configure(streamFactory.createInputStream(ctx, partition), configuration);
+            streamRecordReader.configure(ctx, streamFactory.createInputStream(ctx, partition), configuration);
             return streamRecordReader;
         } catch (InstantiationException | IllegalAccessException | InvocationTargetException
                 | NoSuchMethodException e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
index 8facce6..505acbd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -35,6 +35,7 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
@@ -43,22 +44,23 @@
 
 public class DelimitedDataParser extends AbstractDataParser implements IStreamDataParser, IRecordDataParser<char[]> {
 
+    private final IHyracksTaskContext ctx;
     private final char fieldDelimiter;
     private final char quote;
     private final boolean hasHeader;
-    private ARecordType recordType;
-    private IARecordBuilder recBuilder;
-    private ArrayBackedValueStorage fieldValueBuffer;
-    private DataOutput fieldValueBufferOutput;
-    private IValueParser[] valueParsers;
+    private final ARecordType recordType;
+    private final IARecordBuilder recBuilder;
+    private final ArrayBackedValueStorage fieldValueBuffer;
+    private final DataOutput fieldValueBufferOutput;
+    private final IValueParser[] valueParsers;
     private FieldCursorForDelimitedDataParser cursor;
-    private byte[] fieldTypeTags;
-    private int[] fldIds;
-    private ArrayBackedValueStorage[] nameBuffers;
-    private boolean areAllNullFields;
+    private final byte[] fieldTypeTags;
+    private final int[] fldIds;
+    private final ArrayBackedValueStorage[] nameBuffers;
 
-    public DelimitedDataParser(IValueParserFactory[] valueParserFactories, char fieldDelimiter, char quote,
-            boolean hasHeader, ARecordType recordType, boolean isStreamParser) throws HyracksDataException {
+    public DelimitedDataParser(IHyracksTaskContext ctx, IValueParserFactory[] valueParserFactories, char fieldDelimiter,
+            char quote, boolean hasHeader, ARecordType recordType, boolean isStreamParser) throws HyracksDataException {
+        this.ctx = ctx;
         this.fieldDelimiter = fieldDelimiter;
         this.quote = quote;
         this.hasHeader = hasHeader;
@@ -107,10 +109,8 @@
         try {
             while (cursor.nextRecord()) {
                 parseRecord();
-                if (!areAllNullFields) {
-                    recBuilder.write(out, true);
-                    return true;
-                }
+                recBuilder.write(out, true);
+                return true;
             }
             return false;
         } catch (IOException e) {
@@ -121,7 +121,6 @@
     private void parseRecord() throws HyracksDataException {
         recBuilder.reset(recordType);
         recBuilder.init();
-        areAllNullFields = true;
 
         for (int i = 0; i < valueParsers.length; ++i) {
             try {
@@ -152,7 +151,6 @@
                     }
                     valueParsers[i].parse(cursor.getBuffer(), cursor.getFieldStart(), cursor.getFieldLength(),
                             fieldValueBufferOutput);
-                    areAllNullFields = false;
                 }
                 if (fldIds[i] < 0) {
                     recBuilder.addField(nameBuffers[i], fieldValueBuffer);
@@ -172,9 +170,7 @@
     public void parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
         cursor.nextRecord(record.get(), record.size());
         parseRecord();
-        if (!areAllNullFields) {
-            recBuilder.write(out, true);
-        }
+        recBuilder.write(out, true);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
index 1fee49f..46c5152 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
@@ -42,15 +42,15 @@
 
     @Override
     public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
-        return createParser();
+        return createParser(ctx);
     }
 
-    private DelimitedDataParser createParser() throws HyracksDataException {
+    private DelimitedDataParser createParser(IHyracksTaskContext ctx) throws HyracksDataException {
         IValueParserFactory[] valueParserFactories = ExternalDataUtils.getValueParserFactories(recordType);
-        char delimiter = ExternalDataUtils.getDelimiter(configuration);
-        char quote = ExternalDataUtils.getQuote(configuration, delimiter);
+        char delimiter = ExternalDataUtils.validateGetDelimiter(configuration);
+        char quote = ExternalDataUtils.validateGetQuote(configuration, delimiter);
         boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
-        return new DelimitedDataParser(valueParserFactories, delimiter, quote, hasHeader, recordType,
+        return new DelimitedDataParser(ctx, valueParserFactories, delimiter, quote, hasHeader, recordType,
                 ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.STREAM));
     }
 
@@ -62,7 +62,7 @@
     @Override
     public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
-        return createParser();
+        return createParser(ctx);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
index d34b5e0..704ea29 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
@@ -55,7 +55,7 @@
         String recordFormat = configuration.get(ExternalDataConstants.KEY_RECORD_FORMAT);
         if (recordFormat == null) {
             throw AlgebricksException.create(ErrorCode.UNKNOWN_RECORD_FORMAT_FOR_META_PARSER,
-                    ExternalDataConstants.KEY_FORMAT);
+                    ExternalDataConstants.KEY_RECORD_FORMAT);
         }
         String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
         if (format == null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index 53cf6b1..2265a25 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -46,15 +46,12 @@
     public static IDataParserFactory getDataParserFactory(ILibraryManager libraryManager,
             Map<String, String> configuration) throws AsterixException {
         IDataParserFactory parserFactory;
-        String parserFactoryName = configuration.get(ExternalDataConstants.KEY_DATA_PARSER);
+        String parserFactoryName = configuration.get(ExternalDataConstants.KEY_PARSER);
         if (ExternalDataUtils.isExternal(parserFactoryName)) {
             return ExternalDataUtils.createExternalParserFactory(libraryManager,
                     ExternalDataUtils.getDataverse(configuration), parserFactoryName);
         } else {
-            String parserFactoryKey = ExternalDataUtils.getRecordFormat(configuration);
-            if (parserFactoryKey == null) {
-                parserFactoryKey = configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
-            }
+            String parserFactoryKey = ExternalDataUtils.getParserFactory(configuration);
             parserFactory = ParserFactoryProvider.getDataParserFactory(parserFactoryKey);
         }
         return parserFactory;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 1378207..a05ad77 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -18,6 +18,10 @@
  */
 package org.apache.asterix.external.util;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
 public class ExternalDataConstants {
 
     private ExternalDataConstants() {
@@ -168,6 +172,26 @@
     public static final String FORMAT_LINE_SEPARATED = "line-separated";
     public static final String FORMAT_HDFS_WRITABLE = "hdfs-writable";
     public static final String FORMAT_KV = "kv";
+    public static final String FORMAT_CSV = "csv";
+    public static final String FORMAT_TSV = "tsv";
+    public static final Set<String> ALL_FORMATS;
+    static {
+        Set<String> formats = new HashSet<>(13);
+        formats.add(FORMAT_HIVE);
+        formats.add(FORMAT_BINARY);
+        formats.add(FORMAT_ADM);
+        formats.add(FORMAT_JSON_LOWER_CASE);
+        formats.add(FORMAT_DELIMITED_TEXT);
+        formats.add(FORMAT_TWEET);
+        formats.add(FORMAT_RSS);
+        formats.add(FORMAT_SEMISTRUCTURED);
+        formats.add(FORMAT_LINE_SEPARATED);
+        formats.add(FORMAT_HDFS_WRITABLE);
+        formats.add(FORMAT_KV);
+        formats.add(FORMAT_CSV);
+        formats.add(FORMAT_TSV);
+        ALL_FORMATS = Collections.unmodifiableSet(formats);
+    }
 
     /**
      * input streams
@@ -234,8 +258,6 @@
     public static final String EXTERNAL = "external";
     public static final String KEY_READER_FACTORY = "reader-factory";
     public static final String READER_RSS = "rss_feed";
-    public static final String FORMAT_CSV = "csv";
-    public static final String FORMAT_TSV = "tsv";
 
     public static final String ERROR_PARSE_RECORD = "Parser failed to parse record";
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 443aa7e..b7b441b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -44,41 +44,49 @@
 
 public class ExternalDataUtils {
 
+    private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
+    static {
+        valueParserFactoryMap.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE);
+        valueParserFactoryMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
+        valueParserFactoryMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
+        valueParserFactoryMap.put(ATypeTag.BIGINT, LongParserFactory.INSTANCE);
+        valueParserFactoryMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
+        valueParserFactoryMap.put(ATypeTag.BOOLEAN, BooleanParserFactory.INSTANCE);
+    }
+
     private ExternalDataUtils() {
     }
 
     // Get a delimiter from the given configuration
-    public static char getDelimiter(Map<String, String> configuration) throws HyracksDataException {
+    public static char validateGetDelimiter(Map<String, String> configuration) throws HyracksDataException {
         String delimiterValue = configuration.get(ExternalDataConstants.KEY_DELIMITER);
         if (delimiterValue == null) {
-            delimiterValue = ExternalDataConstants.DEFAULT_DELIMITER;
-        } else if (delimiterValue.length() != 1) {
-            throw new RuntimeDataException(ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_DELIMITER,
-                    delimiterValue);
+            return ExternalDataConstants.DEFAULT_DELIMITER.charAt(0);
         }
+        validateDelimiter(delimiterValue);
         return delimiterValue.charAt(0);
     }
 
     // Get a quote from the given configuration when the delimiter is given
     // Need to pass delimiter to check whether they share the same character
-    public static char getQuote(Map<String, String> configuration, char delimiter) throws HyracksDataException {
+    public static char validateGetQuote(Map<String, String> configuration, char delimiter) throws HyracksDataException {
         String quoteValue = configuration.get(ExternalDataConstants.KEY_QUOTE);
         if (quoteValue == null) {
-            quoteValue = ExternalDataConstants.DEFAULT_QUOTE;
-        } else if (quoteValue.length() != 1) {
-            throw new RuntimeDataException(ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_QUOTE,
-                    quoteValue);
+            return ExternalDataConstants.DEFAULT_QUOTE.charAt(0);
         }
+        validateQuote(quoteValue);
+        char quote = quoteValue.charAt(0);
+        validateDelimiterAndQuote(delimiter, quote);
+        return quote;
+    }
 
-        // Since delimiter (char type value) can't be null,
-        // we only check whether delimiter and quote use the same character
-        if (quoteValue.charAt(0) == delimiter) {
-            throw new RuntimeDataException(
-                    ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_QUOTE_DELIMITER_MISMATCH, quoteValue,
-                    delimiter);
+    public static char validateGetQuoteEscape(Map<String, String> configuration) throws HyracksDataException {
+        String quoteEscapeValue = configuration.get(ExternalDataConstants.KEY_QUOTE_ESCAPE);
+        if (quoteEscapeValue == null) {
+            return ExternalDataConstants.ESCAPE;
         }
-
-        return quoteValue.charAt(0);
+        validateQuoteEscape(quoteEscapeValue);
+        return quoteEscapeValue.charAt(0);
     }
 
     public static void validateDataParserParameters(Map<String, String> configuration) throws AsterixException {
@@ -86,8 +94,8 @@
         if (parser == null) {
             String parserFactory = configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
             if (parserFactory == null) {
-                throw new AsterixException("The parameter " + ExternalDataConstants.KEY_FORMAT + " or "
-                        + ExternalDataConstants.KEY_PARSER_FACTORY + " must be specified.");
+                throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED,
+                        ExternalDataConstants.KEY_FORMAT + " or " + ExternalDataConstants.KEY_PARSER_FACTORY);
             }
         }
     }
@@ -95,7 +103,7 @@
     public static void validateDataSourceParameters(Map<String, String> configuration) throws AsterixException {
         String reader = configuration.get(ExternalDataConstants.KEY_READER);
         if (reader == null) {
-            throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER + " must be specified.");
+            throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_READER);
         }
     }
 
@@ -141,22 +149,13 @@
         return configuration.get(ExternalDataConstants.KEY_DATAVERSE);
     }
 
-    public static String getRecordFormat(Map<String, String> configuration) {
-        String parserFormat = configuration.get(ExternalDataConstants.KEY_DATA_PARSER);
-        return parserFormat != null ? parserFormat : configuration.get(ExternalDataConstants.KEY_FORMAT);
-    }
-
-    private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
-
-    private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
-        Map<ATypeTag, IValueParserFactory> m = new EnumMap<>(ATypeTag.class);
-        m.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE);
-        m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
-        m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
-        m.put(ATypeTag.BIGINT, LongParserFactory.INSTANCE);
-        m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
-        m.put(ATypeTag.BOOLEAN, BooleanParserFactory.INSTANCE);
-        return m;
+    public static String getParserFactory(Map<String, String> configuration) {
+        String parserFactory = configuration.get(ExternalDataConstants.KEY_PARSER);
+        if (parserFactory != null) {
+            return parserFactory;
+        }
+        parserFactory = configuration.get(ExternalDataConstants.KEY_FORMAT);
+        return parserFactory != null ? parserFactory : configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
     }
 
     public static IValueParserFactory[] getValueParserFactories(ARecordType recordType) {
@@ -284,8 +283,7 @@
     public static int getNumberOfKeys(Map<String, String> configuration) throws AsterixException {
         String keyIndexes = configuration.get(ExternalDataConstants.KEY_KEY_INDEXES);
         if (keyIndexes == null) {
-            throw new AsterixException(
-                    "A change feed must have the parameter " + ExternalDataConstants.KEY_KEY_INDEXES);
+            throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_KEY_INDEXES);
         }
         return keyIndexes.split(",").length;
     }
@@ -340,7 +338,7 @@
     }
 
     /**
-     * Prepares the configuration of the external dataset and its adapter by filling the information required by
+     * Prepares the configuration of the external data and its adapter by filling the information required by
      * adapters and parsers.
      *
      * @param adapterName adapter name
@@ -355,4 +353,82 @@
             configuration.put(ExternalDataConstants.KEY_PARSER, configuration.get(ExternalDataConstants.KEY_FORMAT));
         }
     }
+
+    /**
+     * Normalizes the values of certain parameters of the adapter configuration. This should happen before persisting
+     * the metadata (e.g. when creating external datasets or feeds) and when creating an adapter factory.
+     *
+     * @param configuration external data configuration
+     */
+    public static void normalize(Map<String, String> configuration) {
+        // normalize the "format" parameter
+        String paramValue = configuration.get(ExternalDataConstants.KEY_FORMAT);
+        if (paramValue != null) {
+            String lowerCaseFormat = paramValue.toLowerCase().trim();
+            if (ExternalDataConstants.ALL_FORMATS.contains(lowerCaseFormat)) {
+                configuration.put(ExternalDataConstants.KEY_FORMAT, lowerCaseFormat);
+            }
+        }
+        // normalize the "header" parameter
+        paramValue = configuration.get(ExternalDataConstants.KEY_HEADER);
+        if (paramValue != null) {
+            configuration.put(ExternalDataConstants.KEY_HEADER, paramValue.toLowerCase().trim());
+        }
+    }
+
+    /**
+     * Validates the parameter values of the adapter configuration. This should happen after normalizing the values.
+     *
+     * @param configuration external data configuration
+     * @throws HyracksDataException HyracksDataException
+     */
+    public static void validate(Map<String, String> configuration) throws HyracksDataException {
+        String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
+        String header = configuration.get(ExternalDataConstants.KEY_HEADER);
+        if (format != null && isHeaderRequiredFor(format) && header == null) {
+            throw new RuntimeDataException(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_HEADER);
+        }
+        if (header != null && !isBoolean(header)) {
+            throw new RuntimeDataException(ErrorCode.INVALID_REQ_PARAM_VAL, ExternalDataConstants.KEY_HEADER, header);
+        }
+        char delimiter = validateGetDelimiter(configuration);
+        validateGetQuote(configuration, delimiter);
+        validateGetQuoteEscape(configuration);
+    }
+
+    private static boolean isHeaderRequiredFor(String format) {
+        return format.equals(ExternalDataConstants.FORMAT_CSV) || format.equals(ExternalDataConstants.FORMAT_TSV);
+    }
+
+    private static boolean isBoolean(String value) {
+        return value.equals(ExternalDataConstants.TRUE) || value.equals(ExternalDataConstants.FALSE);
+    }
+
+    private static void validateDelimiter(String delimiter) throws RuntimeDataException {
+        if (delimiter.length() != 1) {
+            throw new RuntimeDataException(ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_DELIMITER,
+                    delimiter);
+        }
+    }
+
+    public static void validateQuote(String quote) throws RuntimeDataException {
+        if (quote.length() != 1) {
+            throw new RuntimeDataException(ErrorCode.PARSER_INVALID_CHAR_LENGTH, quote,
+                    ExternalDataConstants.KEY_QUOTE);
+        }
+    }
+
+    private static void validateQuoteEscape(String quoteEsc) throws RuntimeDataException {
+        if (quoteEsc.length() != 1) {
+            throw new RuntimeDataException(ErrorCode.PARSER_INVALID_CHAR_LENGTH, quoteEsc,
+                    ExternalDataConstants.KEY_QUOTE_ESCAPE);
+        }
+    }
+
+    private static void validateDelimiterAndQuote(char delimiter, char quote) throws RuntimeDataException {
+        if (quote == delimiter) {
+            throw new RuntimeDataException(
+                    ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_QUOTE_DELIMITER_MISMATCH, quote, delimiter);
+        }
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
index 47f784a..8ef6273 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
@@ -48,9 +48,11 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.algebricks.data.IPrinter;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.test.support.TestUtils;
 import org.junit.Assert;
 
 import junit.framework.Test;
@@ -58,6 +60,9 @@
 import junit.framework.TestSuite;
 
 public class ClassAdToADMTest extends TestCase {
+
+    private final IHyracksTaskContext ctx = TestUtils.createHyracksTask();
+
     /**
      * Create the test case
      *
@@ -123,7 +128,7 @@
                 FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
                 LocalFSInputStream in = new LocalFSInputStream(watcher);
                 SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader();
-                recordReader.configure(in, config);
+                recordReader.configure(ctx, in, config);
                 while (recordReader.hasNext()) {
                     tb.reset();
                     IRawRecord<char[]> record = recordReader.next();
@@ -162,7 +167,7 @@
                 FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
                 LocalFSInputStream in = new LocalFSInputStream(watcher);
                 SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader();
-                recordReader.configure(in, config);
+                recordReader.configure(ctx, in, config);
                 try {
                     Value val = new Value(objectPool);
                     while (recordReader.hasNext()) {
@@ -204,7 +209,7 @@
                 FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
                 LocalFSInputStream in = new LocalFSInputStream(watcher);
                 SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader();
-                recordReader.configure(in, config);
+                recordReader.configure(ctx, in, config);
                 try {
                     Value val = new Value(objectPool);
                     while (recordReader.hasNext()) {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
index 2ba5a3e..2972542 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
@@ -41,6 +41,8 @@
 import org.apache.asterix.external.input.stream.LocalFSInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FileSystemWatcher;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.test.support.TestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -56,6 +58,7 @@
     private final CharBuffer chars = CharBuffer.allocate(BUFFER_SIZE);
     private final CharArrayRecord value = new CharArrayRecord();
     private final ByteBuf nettyBuffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer(KB32, Integer.MAX_VALUE);
+    private final IHyracksTaskContext ctx = TestUtils.createHyracksTask();
 
     @Test
     public void eatGlass() {
@@ -83,7 +86,7 @@
         FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
         LocalFSInputStream in = new LocalFSInputStream(watcher);
         try (SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader()) {
-            recordReader.configure(in, config);
+            recordReader.configure(ctx, in, config);
             while (recordReader.hasNext()) {
                 try {
                     IRawRecord<char[]> record = recordReader.next();
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
index 1584065..47c6ffe 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
@@ -44,12 +44,15 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.algebricks.data.IPrinter;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.test.support.TestUtils;
 import org.junit.Assert;
 
 public class RecordWithMetaTest {
+    private final IHyracksTaskContext ctx = TestUtils.createHyracksTask();
     private static ARecordType recordType;
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
@@ -90,7 +93,7 @@
             config.put(ExternalDataConstants.KEY_HEADER, "true");
             config.put(ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.DEFAULT_QUOTE);
             LineRecordReader lineReader = new LineRecordReader();
-            lineReader.configure(inputStream, config);
+            lineReader.configure(ctx, inputStream, config);
             // create csv with json record reader
             CSVToRecordWithMetadataAndPKConverter recordConverter = new CSVToRecordWithMetadataAndPKConverter(
                     valueIndex, delimiter, metaType, recordType, pkIndicators, pkIndexes, keyTypes);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 7ed53e4..071fd1c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -135,13 +135,13 @@
                     default:
                         throw new AsterixException("Unknown Adapter type " + adapterType);
                 }
-                adapterFactory.setOutputType(adapterOutputType);
-                adapterFactory.setMetaType(metaType);
-                adapterFactory.configure(appCtx.getServiceContext(), configuration);
             } else {
-                AdapterFactoryProvider.getAdapterFactory(appCtx.getServiceContext(), adapterName, configuration,
-                        adapterOutputType, metaType);
+                ExternalDataUtils.prepare(adapterName, configuration);
+                adapterFactory = (ITypedAdapterFactory) appCtx.getAdapterFactoryService().createAdapterFactory();
             }
+            adapterFactory.setOutputType(adapterOutputType);
+            adapterFactory.setMetaType(metaType);
+            adapterFactory.configure(appCtx.getServiceContext(), configuration);
             if (metaType == null && configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME)) {
                 metaType = getOutputType(feed, configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME));
                 if (metaType == null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index e44480c..3f78234 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -51,6 +51,7 @@
 
 public class TestUtils {
 
+    private static final int DEFAULT_FRAME_SIZE = 32768;
     public static final IWarningCollector NOOP_WARNING_COLLECTOR = new IWarningCollector() {
         @Override
         public void warn(Warning warning) {
@@ -68,6 +69,10 @@
         }
     };
 
+    public static IHyracksTaskContext createHyracksTask() {
+        return create(DEFAULT_FRAME_SIZE);
+    }
+
     public static IHyracksTaskContext create(int frameSize) {
         IOManager ioManager = null;
         try {