Merge "Merge commit 'f1cd417' from 'gerrit/mad-hatter'"
diff --git a/asterixdb/asterix-app/data/csv/01.csv b/asterixdb/asterix-app/data/csv/01.csv
new file mode 100644
index 0000000..6957e76
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/01.csv
@@ -0,0 +1,3 @@
+1,,"good","recommend"
+2,,"bad","not recommend"
+3,,"good",
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/02.csv b/asterixdb/asterix-app/data/csv/02.csv
new file mode 100644
index 0000000..630843f
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/02.csv
@@ -0,0 +1,3 @@
+4,2018,"good","recommend"
+5,2018,,"not recommend"
+6,2018,"good",
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/empty.csv b/asterixdb/asterix-app/data/csv/empty.csv
new file mode 100644
index 0000000..3f2ff2d
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/empty.csv
@@ -0,0 +1,5 @@
+
+
+
+
+
diff --git a/asterixdb/asterix-app/data/csv/sample_09.csv b/asterixdb/asterix-app/data/csv/sample_09.csv
new file mode 100644
index 0000000..b14219d
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/sample_09.csv
@@ -0,0 +1,17 @@
+a,b,c,d,e
+0,",     boo", 1,2,3
+1,"","",❤,
+2,3,4,\n,
+3,"quoted ""f"" field",,,
+4,4,,,
+5,"{""vehicle"": ""car"", ""location"": [2.0, 0.1]}",,,
+6,2,3,,
+7,8,9,,
+8,2,3,,
+9,8,9,,
+10,"field
+""f""
+with multiple lines",,,
+11,4,,,
+12,5,ʤ,,
+John,Green,111 downtown st.,"city, state",99999
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/sample_10.csv b/asterixdb/asterix-app/data/csv/sample_10.csv
new file mode 100644
index 0000000..3beee08
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/sample_10.csv
@@ -0,0 +1,39 @@
+1,"?/ Text ending with a backslash  / \",2000-09-03 07:12:22
+2,non quoted text!yes......,2003-08-09 22:34:19
+3,Text with more sentences. Another sentence.,2003-09-12 05:29:12
+4,"Quoted text.. yes.",2003-09-13 17:21:49
+5,Another text,2003-01-21 23:31:41
+6,Text with' quotes.,2003-09-14 20:15:50
+7,Text with quote's,2003-09-14 18:34:03
+8,"Text with quotes '",2003-01-28 20:32:13
+9,"Text with quotes """,2003-01-18 11:44:15
+10,Text with question marks!?!?,2003-09-18 06:25:56
+11,""" Text that starts with quotes",2003-09-12 00:31:24
+12,"Text with \"" backslash and quotes",2003-09-13 20:30:06
+13,"Text with \"" backslash and quotes\""",2003-09-14 16:20:36
+14,"Text that has comma ,",2003-09-12 08:21:18
+15,"Text that has "","" quoted comma",2003-09-12 08:21:18
+16,",Text that has ",2003-09-12 08:21:18
+17,","",Text that has ",2003-09-12 08:21:18
+18,"Text with commas,inside it., yes",2003-09-13 23:42:14
+19,"Text that has \n inside ",2003-09-12 08:21:18
+20,"Text that has \\\n inside ",2003-09-12 08:21:18
+21,text with :),2003-09-05 19:15:34
+22,"Text that has \\\"" inside \\",2003-09-12 08:21:18
+23,"Text that has \\\"" inside \\""",2003-09-12 08:21:18
+24,"""text that spans multiple
+Lines and more
+Lines ane more and more
+Lines ...
+And yet more lines
+And more""",2011-09-19 01:09:09
+25,"Text ""
+more lines",2011-09-19 01:09:09
+26,"""
+",2011-09-19 01:09:09
+27,"Text",""
+28,"Text","2011-09-19 01:09:09"
+29,"Text\.","2011-09-19 01:09:09"
+30,Text\.,"2011-09-19 01:09:09"
+31,"\.Text","2011-09-19 01:09:09"
+32,\.Text,"2011-09-19 01:09:09"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/sample_11.csv b/asterixdb/asterix-app/data/csv/sample_11.csv
new file mode 100644
index 0000000..b9a9571
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/sample_11.csv
@@ -0,0 +1,4 @@
+1,","", b", 3,4,5
+","", b",4, 3,4,5
+,,,,
+"dd",,,,
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/sample_12.csv b/asterixdb/asterix-app/data/csv/sample_12.csv
new file mode 100644
index 0000000..0c9baf5
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/sample_12.csv
@@ -0,0 +1,16 @@
+f1,f2,f3
+1,true,"text"
+2,false,"text"
+3,true,"text"
+4,true,""
+5,false,
+6,true,"text""
+more lines"
+7,false,"""
+"
+8,true,""
+9,false,"text"""
+10,false,text\.
+11,true,"text\."
+,false,\.text
+13,true,"\.text"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/sample_13.csv b/asterixdb/asterix-app/data/csv/sample_13.csv
new file mode 100644
index 0000000..9f53f56
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/sample_13.csv
@@ -0,0 +1,11 @@
+
+
+f1,f2,f3,f4
+
+1,,"good","recommend"
+
+2,,"bad","not recommend"
+3,,"good",
+
+
+
diff --git a/asterixdb/asterix-app/data/tsv/01.tsv b/asterixdb/asterix-app/data/tsv/01.tsv
new file mode 100644
index 0000000..98876c7
--- /dev/null
+++ b/asterixdb/asterix-app/data/tsv/01.tsv
@@ -0,0 +1,3 @@
+1		"good"	"recommend"
+2		"bad"	"not recommend"
+3		"good"	"recommend"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/tsv/02.tsv b/asterixdb/asterix-app/data/tsv/02.tsv
new file mode 100644
index 0000000..c01ce7c
--- /dev/null
+++ b/asterixdb/asterix-app/data/tsv/02.tsv
@@ -0,0 +1,3 @@
+4	2018	"good"	"recommend"
+5	2018		"not recommend"
+6	2018	"good"	"recommend"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/tsv/sample_01.tsv b/asterixdb/asterix-app/data/tsv/sample_01.tsv
new file mode 100644
index 0000000..aab289a
--- /dev/null
+++ b/asterixdb/asterix-app/data/tsv/sample_01.tsv
@@ -0,0 +1,28 @@
+11	55	text field wih , charrrrrrrrrrr	true	90	0.666666667
+12	55	text field with " charrrrrrrrrr	false	90	0.666666667
+14	55	text field with ' charrrrrrrrrr	false	90	0.666666667
+15	55	text field with \ charrrrrrrrrr	false	90	0.666666667
+16	55	text field wih \, char         	true	90	0.666666667
+17	55	text field with \" charrrrrrrrr	false	90	0.666666667
+18	55	text field with \' charrrrrrrrr	false	90	0.666666667
+19	55	text field with \\ charrrrrrrrr	false	90	0.666666667
+20	55	text field ending with  charr ,	false	90	0.666666667
+21	55	text field ending with  charr "	false	90	0.666666667
+22	55	text field ending with  charr '	false	90	0.666666667
+23	55	text field ending with  charr \	false	90	0.666666667
+24	55	text field ending with charr \,	false	90	0.666666667
+25	55	text field ending with charr \"	false	90	0.666666667
+26	55	text field ending with charr \'	false	90	0.666666667
+27	55	text field ending with charr \\	false	90	0.666666667
+28	55	,text field starting with charr	false	90	0.666666667
+29	55	"text field starting with charr	false	90	0.666666667
+30	55	'text field starting with charr	false	90	0.666666667
+31	55	\text field starting with charr	false	90	0.666666667
+32	55	\,text field starting with char	false	90	0.666666667
+33	55	\"text field starting with char	false	90	0.666666667
+34	55	\'text field starting with char	false	90	0.666666667
+35	55	\\text field starting with char	false	90	0.666666667
+36	55	"text field inside   with char"	false	90	0.666666667
+37	55	  text field with charrrrrrrrr 	false	90	0.666666667
+38	55	text field with "" charrrrrrrrr	false	90	0.666666667
+39	55	text field "with" charrrrrrrrrr	false	90	0.666666667
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f83e21e..ea2ce40 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -93,6 +93,7 @@
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
 import org.apache.asterix.lang.common.base.IReturningStatement;
 import org.apache.asterix.lang.common.base.IRewriterFactory;
@@ -677,6 +678,7 @@
                                 itemTypeDataverseName + "." + itemTypeName);
                     }
                     itemType = itemTypeEntity.getDatatype();
+                    validateDatasetItemType(dsType, itemType, false, sourceLoc);
                     break;
                 case RECORD:
                     itemTypeDataverseName = dataverseName;
@@ -684,6 +686,7 @@
                     lockUtil.createTypeBegin(lockManager, metadataProvider.getLocks(), itemTypeDataverseName,
                             itemTypeName);
                     itemType = translateType(itemTypeDataverseName, itemTypeName, itemTypeExpr, mdTxnCtx);
+                    validateDatasetItemType(dsType, itemType, false, sourceLoc);
                     MetadataManager.INSTANCE.addDatatype(mdTxnCtx,
                             new Datatype(itemTypeDataverseName, itemTypeName, itemType, true));
                     break;
@@ -701,13 +704,8 @@
             } else {
                 validateCompactionPolicy(compactionPolicy, compactionPolicyProperties, mdTxnCtx, false, sourceLoc);
             }
-            switch (dd.getDatasetType()) {
+            switch (dsType) {
                 case INTERNAL:
-                    if (itemType.getTypeTag() != ATypeTag.OBJECT) {
-                        throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
-                                "Dataset type has to be a record type.");
-                    }
-
                     IAType metaItemType = null;
                     if (metaItemTypeExpr != null) {
                         switch (metaItemTypeExpr.getTypeKind()) {
@@ -720,10 +718,7 @@
                                             metaItemTypeDataverseName + "." + metaItemTypeName);
                                 }
                                 metaItemType = metaItemTypeEntity.getDatatype();
-                                if (metaItemType.getTypeTag() != ATypeTag.OBJECT) {
-                                    throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
-                                            "Dataset meta type has to be a record type.");
-                                }
+                                validateDatasetItemType(dsType, metaItemType, true, sourceLoc);
                                 break;
                             case RECORD:
                                 metaItemTypeDataverseName = dataverseName;
@@ -732,6 +727,7 @@
                                         metaItemTypeDataverseName, metaItemTypeName);
                                 metaItemType = translateType(metaItemTypeDataverseName, metaItemTypeName,
                                         metaItemTypeExpr, mdTxnCtx);
+                                validateDatasetItemType(dsType, metaItemType, true, sourceLoc);
                                 MetadataManager.INSTANCE.addDatatype(mdTxnCtx,
                                         new Datatype(metaItemTypeDataverseName, metaItemTypeName, metaItemType, true));
                                 break;
@@ -769,12 +765,15 @@
                 case EXTERNAL:
                     ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl();
                     Map<String, String> properties = createExternalDatasetProperties(dd, metadataProvider, mdTxnCtx);
+                    ExternalDataUtils.normalize(properties);
+                    ExternalDataUtils.validate(properties);
+                    validateExternalDatasetDetails(externalDetails, properties);
                     datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
                             TransactionState.COMMIT);
                     break;
                 default:
                     throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
-                            "Unknown dataset type " + dd.getDatasetType());
+                            "Unknown dataset type " + dsType);
             }
 
             // #. initialize DatasetIdFactory if it is not initialized.
@@ -788,7 +787,7 @@
                     datasetDetails, dd.getHints(), dsType, DatasetIdFactory.generateDatasetId(),
                     MetadataUtil.PENDING_ADD_OP, compressionScheme);
             MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
-            if (dd.getDatasetType() == DatasetType.INTERNAL) {
+            if (dsType == DatasetType.INTERNAL) {
                 JobSpecification jobSpec = DatasetUtil.createDatasetJobSpec(dataset, metadataProvider);
 
                 // #. make metadataTxn commit before calling runJob.
@@ -857,6 +856,14 @@
         }
     }
 
+    protected void validateDatasetItemType(DatasetType datasetType, IAType itemType, boolean isMetaItemType,
+            SourceLocation sourceLoc) throws AlgebricksException {
+        if (itemType.getTypeTag() != ATypeTag.OBJECT) {
+            throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
+                    String.format("Dataset %s has to be a record type.", isMetaItemType ? "meta type" : "type"));
+        }
+    }
+
     protected Map<String, String> createExternalDatasetProperties(DatasetDecl dd, MetadataProvider metadataProvider,
             MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
         ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl();
@@ -2218,9 +2225,11 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         lockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
         try {
-            CompiledLoadFromFileStatement cls =
-                    new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName(), loadStmt.getAdapter(),
-                            loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
+            Map<String, String> properties = loadStmt.getProperties();
+            ExternalDataUtils.normalize(properties);
+            ExternalDataUtils.validate(properties);
+            CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName,
+                    loadStmt.getDatasetName(), loadStmt.getAdapter(), properties, loadStmt.dataIsAlreadySorted());
             cls.setSourceLocation(stmt.getSourceLocation());
             JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls,
                     null, responsePrinter, warningCollector);
@@ -2417,7 +2426,10 @@
                             "A feed with this name " + feedName + " already exists.");
                 }
             }
-            feed = new Feed(dataverseName, feedName, cfs.getConfiguration());
+            Map<String, String> configuration = cfs.getConfiguration();
+            ExternalDataUtils.normalize(configuration);
+            ExternalDataUtils.validate(configuration);
+            feed = new Feed(dataverseName, feedName, configuration);
             FeedMetadataUtil.validateFeed(feed, mdTxnCtx, appCtx);
             MetadataManager.INSTANCE.addFeed(metadataProvider.getMetadataTxnContext(), feed);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -3426,4 +3438,14 @@
             throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, clientRequest.getId());
         }
     }
+
+    protected void validateExternalDatasetDetails(ExternalDetailsDecl externalDetails, Map<String, String> properties)
+            throws RuntimeDataException {
+        String adapter = externalDetails.getAdapter();
+        // "format" parameter is needed for "S3" data source
+        if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(adapter)
+                && properties.get(ExternalDataConstants.KEY_FORMAT) == null) {
+            throw new RuntimeDataException(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_FORMAT);
+        }
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
index 3b4cdf8..d2158ba 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
@@ -18,9 +18,12 @@
  */
 package org.apache.asterix.test.external_dataset.aws;
 
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -66,9 +69,13 @@
     private static S3Client client;
     private static final String S3_MOCK_SERVER_BUCKET = "playground";
     private static final String S3_MOCK_SERVER_BUCKET_DEFINITION = "json-data/reviews/"; // data resides here
+    private static final String S3_MOCK_SERVER_BUCKET_CSV_DEFINITION = "csv-data/reviews/"; // data resides here
+    private static final String S3_MOCK_SERVER_BUCKET_TSV_DEFINITION = "tsv-data/reviews/"; // data resides here
     private static final String S3_MOCK_SERVER_REGION = "us-west-2";
     private static final int S3_MOCK_SERVER_PORT = 8001;
     private static final String S3_MOCK_SERVER_HOSTNAME = "http://localhost:" + S3_MOCK_SERVER_PORT;
+    private static final String CSV_DATA_PATH = joinPath("data", "csv");
+    private static final String TSV_DATA_PATH = joinPath("data", "tsv");
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -210,6 +217,26 @@
                 PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
                         .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/q2/2.json").build(),
                 RequestBody.fromString("{\"id\": 14, \"year\": 2019, \"quarter\": 2, \"review\": \"bad\"}"));
+
+        LOGGER.info("Adding CSV files to the bucket");
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_CSV_DEFINITION + "01.csv").build(),
+                RequestBody.fromFile(Paths.get(CSV_DATA_PATH, "01.csv")));
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_CSV_DEFINITION + "2018/01.csv").build(),
+                RequestBody.fromFile(Paths.get(CSV_DATA_PATH, "02.csv")));
+
+        LOGGER.info("Adding TSV files to the bucket");
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_TSV_DEFINITION + "01.tsv").build(),
+                RequestBody.fromFile(Paths.get(TSV_DATA_PATH, "01.tsv")));
+        client.putObject(
+                PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+                        .key(S3_MOCK_SERVER_BUCKET_TSV_DEFINITION + "2018/01.tsv").build(),
+                RequestBody.fromFile(Paths.get(TSV_DATA_PATH, "02.tsv")));
         LOGGER.info("Files added successfully");
     }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java
new file mode 100644
index 0000000..3a4823f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.hyracks.control.nc.result.ResultPartitionReader;
+import org.apache.hyracks.util.Span;
+import org.apache.hyracks.util.ThreadDumpUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ResultStreamingFailureTest {
+
+    private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
+    @Before
+    public void setUp() throws Exception {
+        integrationUtil.init(true, AsterixHyracksIntegrationUtil.DEFAULT_CONF_FILE);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        integrationUtil.deinit(true);
+    }
+
+    @Test
+    public void resultStreamingFailureTest() throws Exception {
+        queryAndDropConnection();
+        // allow result sender to terminate and ensure no leaks
+        Span timeout = Span.start(5, TimeUnit.SECONDS);
+        while (!timeout.elapsed()) {
+            String threadDump = ThreadDumpUtil.takeDumpString();
+            if (!threadDump.contains(ResultPartitionReader.class.getName())) {
+                return;
+            }
+            TimeUnit.SECONDS.sleep(1);
+        }
+        throw new AssertionError("found leaking senders in:\n" + ThreadDumpUtil.takeDumpString());
+    }
+
+    private void queryAndDropConnection() throws IOException {
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            final List<NameValuePair> params = new ArrayList<>();
+            params.add(new BasicNameValuePair("statement", "select * from range(1, 10000000) r;"));
+            HttpPost request = new HttpPost("http://localhost:19004/query/service");
+            request.setEntity(new UrlEncodedFormEntity(params, StandardCharsets.UTF_8));
+            CloseableHttpResponse response = httpClient.execute(request);
+            Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+            // close connection without streaming the result
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp
new file mode 100644
index 0000000..f7fe18c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE TYPE t1 AS {f1: string, f2: string, f3: string, f4: string, f5: string};
+CREATE TYPE t2 AS {f1: string, f2: string, f3: string};
+CREATE TYPE t3 AS {f1: int?, f2: boolean, f3: string?};
+CREATE TYPE t4 AS {f1: string, f2: string, f3: string, f4: string};
+
+CREATE EXTERNAL DATASET ds1(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_09.csv"), ("format"="CSV"), ("header"="FALSE"));
+CREATE EXTERNAL DATASET ds2(t2) USING localfs(("path"="asterix_nc1://data/csv/sample_10.csv"), ("format"="Csv"), ("header"="False"));
+CREATE EXTERNAL DATASET ds3(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_11.csv"), ("format"="csv"), ("header"="FALSE"));
+CREATE EXTERNAL DATASET ds4(t3) USING localfs(("path"="asterix_nc1://data/csv/sample_12.csv"), ("format"="csv"), ("header"="True"));
+CREATE EXTERNAL DATASET ds5(t4) USING localfs(("path"="asterix_nc1://data/csv/sample_13.csv"), ("format"="csv"), ("header"="True"));
+CREATE EXTERNAL DATASET ds6(t4) USING localfs(("path"="asterix_nc1://data/csv/empty.csv"), ("format"="csv"), ("header"="false"));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.2.query.sqlpp
new file mode 100644
index 0000000..d870372
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.2.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds1 v SELECT VALUE v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.3.query.sqlpp
new file mode 100644
index 0000000..64a2f8a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.3.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds2 v SELECT VALUE v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.4.query.sqlpp
new file mode 100644
index 0000000..313198c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.4.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds3 v SELECT VALUE v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.5.query.sqlpp
new file mode 100644
index 0000000..065de4e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.5.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds4 v SELECT VALUE v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.query.sqlpp
new file mode 100644
index 0000000..a3d113d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds5 v SELECT VALUE v ORDER BY v.f1;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.7.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.7.query.sqlpp
new file mode 100644
index 0000000..2e5b312
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.7.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds6 v SELECT VALUE v ORDER BY v.f1;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.8.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.8.ddl.sqlpp
new file mode 100644
index 0000000..86a1b59
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.8.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp
new file mode 100644
index 0000000..cabe54b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE TYPE t1 AS {f1: int, f2: int, f3: string, f4: boolean, f5: bigint, f6: double};
+
+CREATE EXTERNAL DATASET ds1(t1) USING localfs(("path"="asterix_nc1://data/tsv/sample_01.tsv"), ("format"="tsv"), ("header"="FALSE"));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.2.query.sqlpp
new file mode 100644
index 0000000..d870372
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.2.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds1 v SELECT VALUE v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.3.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.3.ddl.sqlpp
new file mode 100644
index 0000000..86a1b59
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.3.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp
new file mode 100644
index 0000000..6184b19
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+DROP TYPE test IF EXISTS;
+CREATE TYPE test AS {id: int, year: int?, review: string, details: string?};
+
+DROP DATASET test IF EXISTS;
+CREATE EXTERNAL DATASET test(test) USING S3 (
+("accessKey"="dummyAccessKey"),
+("secretKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="csv-data/reviews"),
+("format"="Csv"),
+("header"="false")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.002.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.002.query.sqlpp
new file mode 100644
index 0000000..6e31eb3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.002.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM test SELECT VALUE test ORDER BY id ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.003.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.003.ddl.sqlpp
new file mode 100644
index 0000000..0ff713d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.003.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATASET test IF EXISTS;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp
new file mode 100644
index 0000000..194adf6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+DROP TYPE test IF EXISTS;
+CREATE TYPE test AS {id: int, year: int?, review: string, details: string?};
+
+DROP DATASET test IF EXISTS;
+CREATE EXTERNAL DATASET test(test) USING S3 (
+("accessKey"="dummyAccessKey"),
+("secretKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="tsv-data/reviews"),
+("format"="TSV"),
+("header"="False")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.002.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.002.query.sqlpp
new file mode 100644
index 0000000..6e31eb3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.002.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM test SELECT VALUE test ORDER BY id ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.003.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.003.ddl.sqlpp
new file mode 100644
index 0000000..0ff713d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.003.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATASET test IF EXISTS;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/negative/negative.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/negative/negative.000.ddl.sqlpp
new file mode 100644
index 0000000..e0fc056
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/negative/negative.000.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// "format" parameter is missing for S3
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+DROP TYPE test IF EXISTS;
+CREATE TYPE test AS {id: int, year: int?, review: string, details: string?};
+
+DROP DATASET test IF EXISTS;
+CREATE EXTERNAL DATASET test(test) USING S3 (
+("accessKey"="dummyAccessKey"),
+("secretKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="tsv-data/reviews")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/list/get-item_03/get-item_03.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/list/get-item_03/get-item_03.4.query.sqlpp
new file mode 100644
index 0000000..ab27331
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/list/get-item_03/get-item_03.4.query.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+{
+"ta1": [1, 2, 3][-1],
+"ta2": [1, 2, 3][-2],
+"ta3": [1, 2, 3][-3],
+"ta4": [1, 2, 3][-4] is missing,
+"tb1": [1, 2, 3][-((select value id from test where id = 1)[0])],
+"tb2": [1, 2, 3][-((select value id from test where id = 1)[0])-1],
+"tb3": [1, 2, 3][-((select value id from test where id = 1)[0])-2],
+"tb4": [1, 2, 3][-((select value id from test where id = 1)[0])-3] is missing
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_01/case_01.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_01/case_01.1.query.sqlpp
index c2a4cea..7f9647b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_01/case_01.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_01/case_01.1.query.sqlpp
@@ -24,5 +24,5 @@
     WHEN MISSING THEN -2
     ELSE 2.0/t
   END
-FROM [0, 1, 2, 4, NULL, [0][-1]] t;
+FROM [0, 1, 2, 4, NULL, MISSING] t;
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_02/case_02.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_02/case_02.1.query.sqlpp
index ab19bcb..095eb20 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_02/case_02.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_02/case_02.1.query.sqlpp
@@ -24,6 +24,6 @@
     WHEN t IS MISSING THEN -2
     ELSE 2.0/t
   END
-FROM [0, 1, 2, 4, NULL, [0][-1]] t;
+FROM [0, 1, 2, 4, NULL, MISSING] t;
 
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_03/case_03.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_03/case_03.1.query.sqlpp
index 7c3e566..12cd3f6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_03/case_03.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_03/case_03.1.query.sqlpp
@@ -24,5 +24,5 @@
     WHEN t IS MISSING THEN (SELECT -2 AS r)
     ELSE (SELECT -3 AS r)
   END
-FROM [0, 1, 2, 4, NULL, [0][-1]] t;
+FROM [0, 1, 2, 4, NULL, MISSING] t;
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_04/case_04.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_04/case_04.1.query.sqlpp
index 154e611..e99da85 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_04/case_04.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_04/case_04.1.query.sqlpp
@@ -24,5 +24,5 @@
     WHEN t IS MISSING THEN (SELECT -2)
     ELSE 2.0/t
   END
-FROM [0, 1, 2, 4, NULL, [0][-1]] t;
+FROM [0, 1, 2, 4, NULL, MISSING] t;
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_05/case_05.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_05/case_05.1.query.sqlpp
index 32030a7..5940b19 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_05/case_05.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_05/case_05.1.query.sqlpp
@@ -26,5 +26,5 @@
     WHEN t=2 THEN MISSING
     ELSE 2.0/t
   END
-FROM [0, 1, 2, 4, NULL, [0][-1]] t;
+FROM [0, 1, 2, 4, NULL, MISSING] t;
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_06/case_06.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_06/case_06.1.query.sqlpp
index 5bf2786..8fbaa7a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_06/case_06.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_06/case_06.1.query.sqlpp
@@ -22,5 +22,5 @@
     WHEN t = 0 THEN MISSING
     ELSE NULL
   END
-FROM [0, 1, 2, 4, NULL, [0][-1]] t;
+FROM [0, 1, 2, 4, NULL, MISSING] t;
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_07/case_07.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_07/case_07.1.query.sqlpp
index 4850047..27180b5 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_07/case_07.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_07/case_07.1.query.sqlpp
@@ -27,4 +27,4 @@
     WHEN t IS UNKNOWN THEN (SELECT -3) // Should never enter this THEN branch.
     ELSE 2.0/t
   END
-FROM [0, 1, 2, 4, NULL, [0][-1]] t;
+FROM [0, 1, 2, 4, NULL, MISSING] t;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/numeric/add_double/add_double.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/numeric/add_double/add_double.1.query.sqlpp
index 366f2b1..a5b1e1b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/numeric/add_double/add_double.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/numeric/add_double/add_double.1.query.sqlpp
@@ -18,4 +18,4 @@
  */
 
 
-{'result1':(double('-6.5d') + tinyint('+1')),'result2':(double('-6.5d') + smallint('2')),'result3':(double('-6.5d') + integer('+3')),'result4':(double('-6.5d') + bigint('-4')),'result5':(double('-6.5d') + float('-5.5f')),'result6':(double('-6.5d') + double('-6.5d')),'result7':(double('-6.5d') + null), 'result8':double('-6.5d') + [1.0][-1]};
+{'result1':(double('-6.5d') + tinyint('+1')),'result2':(double('-6.5d') + smallint('2')),'result3':(double('-6.5d') + integer('+3')),'result4':(double('-6.5d') + bigint('-4')),'result5':(double('-6.5d') + float('-5.5f')),'result6':(double('-6.5d') + double('-6.5d')),'result7':(double('-6.5d') + null), 'result8':double('-6.5d') + MISSING};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.1.ddl.sqlpp
new file mode 100644
index 0000000..d881c98
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.1.ddl.sqlpp
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test various combinations of grouping sets
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type tenkType as closed {
+  unique1         : integer,
+  unique2         : integer,
+  two             : integer,
+  four            : integer,
+  ten             : integer,
+  twenty          : integer,
+  hundred         : integer,
+  thousand        : integer,
+  twothousand     : integer,
+  fivethous       : integer,
+  tenthous        : integer,
+  odd100          : integer,
+  even100         : integer,
+  stringu1        : string,
+  stringu2        : string,
+  string4         : string
+};
+
+create dataset tenk(tenkType) primary key unique2;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.2.update.sqlpp
new file mode 100644
index 0000000..7128175
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.2.update.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+load  dataset tenk using localfs ((`path`=`asterix_nc1://data/tenk.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.3.query.sqlpp
new file mode 100644
index 0000000..ddd69bd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.3.query.sqlpp
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+select two, four, ten, twenty,
+  sum(unique1) as agg_sum
+from tenk
+group by two, four, ten, twenty
+
+union all
+select two, four, ten, twenty,
+  sum(unique1) as agg_sum
+from tenk
+group by -two as two, -four as four, -ten as ten, -twenty as twenty
+
+union all
+select two, four, ten, twenty,
+  sum(unique1) as agg_sum
+from tenk
+group by tostring(two) as two, tostring(four) as four, tostring(ten) as ten, tostring(twenty) as twenty
+
+union all
+select two, four, ten, null as twenty,
+  sum(unique1) as agg_sum
+from tenk
+group by two, four, ten
+
+union all
+select two, four, ten, null as twenty,
+  sum(unique1) as agg_sum
+from tenk
+group by -two as two, -four as four, -ten as ten
+
+union all
+select two, four, ten, null as twenty,
+  sum(unique1) as agg_sum
+from tenk
+group by tostring(two) as two, tostring(four) as four, tostring(ten) as ten
+
+union all
+select two, four, null as ten, null as twenty,
+  sum(unique1) as agg_sum
+from tenk
+group by two, four
+
+union all
+select two, four, null as ten, null as twenty,
+  sum(unique1) as agg_sum
+from tenk
+group by -two as two, -four as four
+
+union all
+select two, four, null as ten, null as twenty,
+  sum(unique1) as agg_sum
+from tenk
+group by tostring(two) as two, tostring(four) as four
+
+union all
+select two, null as four, null as ten, null as twenty,
+  sum(unique1) as agg_sum
+from tenk
+group by two
+
+union all
+select two, null as four, null as ten, null as twenty,
+  sum(unique1) as agg_sum
+from tenk
+group by -two as two
+
+union all
+select two, null as four, null as ten, null as twenty,
+  sum(unique1) as agg_sum
+from tenk
+group by tostring(two) as two
+
+order by two, four, ten, twenty;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.2.adm
new file mode 100644
index 0000000..5c84fb8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.2.adm
@@ -0,0 +1,15 @@
+{ "f1": "a", "f2": "b", "f3": "c", "f4": "d", "f5": "e" }
+{ "f1": "0", "f2": ",     boo", "f3": " 1", "f4": "2", "f5": "3" }
+{ "f1": "1", "f2": "", "f3": "", "f4": "❤", "f5": "" }
+{ "f1": "2", "f2": "3", "f3": "4", "f4": "\\n", "f5": "" }
+{ "f1": "3", "f2": "quoted \"f\" field", "f3": "", "f4": "", "f5": "" }
+{ "f1": "4", "f2": "4", "f3": "", "f4": "", "f5": "" }
+{ "f1": "5", "f2": "{\"vehicle\": \"car\", \"location\": [2.0, 0.1]}", "f3": "", "f4": "", "f5": "" }
+{ "f1": "6", "f2": "2", "f3": "3", "f4": "", "f5": "" }
+{ "f1": "7", "f2": "8", "f3": "9", "f4": "", "f5": "" }
+{ "f1": "8", "f2": "2", "f3": "3", "f4": "", "f5": "" }
+{ "f1": "9", "f2": "8", "f3": "9", "f4": "", "f5": "" }
+{ "f1": "10", "f2": "field\n\"f\"\nwith multiple lines", "f3": "", "f4": "", "f5": "" }
+{ "f1": "11", "f2": "4", "f3": "", "f4": "", "f5": "" }
+{ "f1": "12", "f2": "5", "f3": "ʤ", "f4": "", "f5": "" }
+{ "f1": "John", "f2": "Green", "f3": "111 downtown st.", "f4": "city, state", "f5": "99999" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.3.adm
new file mode 100644
index 0000000..80f5fb7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.3.adm
@@ -0,0 +1,32 @@
+{ "f1": "1", "f2": "?/ Text ending with a backslash  / \\", "f3": "2000-09-03 07:12:22" }
+{ "f1": "2", "f2": "non quoted text!yes......", "f3": "2003-08-09 22:34:19" }
+{ "f1": "3", "f2": "Text with more sentences. Another sentence.", "f3": "2003-09-12 05:29:12" }
+{ "f1": "4", "f2": "Quoted text.. yes.", "f3": "2003-09-13 17:21:49" }
+{ "f1": "5", "f2": "Another text", "f3": "2003-01-21 23:31:41" }
+{ "f1": "6", "f2": "Text with' quotes.", "f3": "2003-09-14 20:15:50" }
+{ "f1": "7", "f2": "Text with quote's", "f3": "2003-09-14 18:34:03" }
+{ "f1": "8", "f2": "Text with quotes '", "f3": "2003-01-28 20:32:13" }
+{ "f1": "9", "f2": "Text with quotes \"", "f3": "2003-01-18 11:44:15" }
+{ "f1": "10", "f2": "Text with question marks!?!?", "f3": "2003-09-18 06:25:56" }
+{ "f1": "11", "f2": "\" Text that starts with quotes", "f3": "2003-09-12 00:31:24" }
+{ "f1": "12", "f2": "Text with \\\" backslash and quotes", "f3": "2003-09-13 20:30:06" }
+{ "f1": "13", "f2": "Text with \\\" backslash and quotes\\\"", "f3": "2003-09-14 16:20:36" }
+{ "f1": "14", "f2": "Text that has comma ,", "f3": "2003-09-12 08:21:18" }
+{ "f1": "15", "f2": "Text that has \",\" quoted comma", "f3": "2003-09-12 08:21:18" }
+{ "f1": "16", "f2": ",Text that has ", "f3": "2003-09-12 08:21:18" }
+{ "f1": "17", "f2": ",\",Text that has ", "f3": "2003-09-12 08:21:18" }
+{ "f1": "18", "f2": "Text with commas,inside it., yes", "f3": "2003-09-13 23:42:14" }
+{ "f1": "19", "f2": "Text that has \\n inside ", "f3": "2003-09-12 08:21:18" }
+{ "f1": "20", "f2": "Text that has \\\\\\n inside ", "f3": "2003-09-12 08:21:18" }
+{ "f1": "21", "f2": "text with :)", "f3": "2003-09-05 19:15:34" }
+{ "f1": "22", "f2": "Text that has \\\\\\\" inside \\\\", "f3": "2003-09-12 08:21:18" }
+{ "f1": "23", "f2": "Text that has \\\\\\\" inside \\\\\"", "f3": "2003-09-12 08:21:18" }
+{ "f1": "24", "f2": "\"text that spans multiple\nLines and more\nLines ane more and more\nLines ...\nAnd yet more lines\nAnd more\"", "f3": "2011-09-19 01:09:09" }
+{ "f1": "25", "f2": "Text \"\nmore lines", "f3": "2011-09-19 01:09:09" }
+{ "f1": "26", "f2": "\"\n", "f3": "2011-09-19 01:09:09" }
+{ "f1": "27", "f2": "Text", "f3": "" }
+{ "f1": "28", "f2": "Text", "f3": "2011-09-19 01:09:09" }
+{ "f1": "29", "f2": "Text\\.", "f3": "2011-09-19 01:09:09" }
+{ "f1": "30", "f2": "Text\\.", "f3": "2011-09-19 01:09:09" }
+{ "f1": "31", "f2": "\\.Text", "f3": "2011-09-19 01:09:09" }
+{ "f1": "32", "f2": "\\.Text", "f3": "2011-09-19 01:09:09" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.4.adm
new file mode 100644
index 0000000..5c61b4a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.4.adm
@@ -0,0 +1,4 @@
+{ "f1": "1", "f2": ",\", b", "f3": " 3", "f4": "4", "f5": "5" }
+{ "f1": ",\", b", "f2": "4", "f3": " 3", "f4": "4", "f5": "5" }
+{ "f1": "", "f2": "", "f3": "", "f4": "", "f5": "" }
+{ "f1": "dd", "f2": "", "f3": "", "f4": "", "f5": "" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.5.adm
new file mode 100644
index 0000000..4b80e26
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.5.adm
@@ -0,0 +1,13 @@
+{ "f1": 1, "f2": true, "f3": "text" }
+{ "f1": 2, "f2": false, "f3": "text" }
+{ "f1": 3, "f2": true, "f3": "text" }
+{ "f1": 4, "f2": true, "f3": null }
+{ "f1": 5, "f2": false, "f3": null }
+{ "f1": 6, "f2": true, "f3": "text\"\nmore lines" }
+{ "f1": 7, "f2": false, "f3": "\"\n" }
+{ "f1": 8, "f2": true, "f3": null }
+{ "f1": 9, "f2": false, "f3": "text\"" }
+{ "f1": 10, "f2": false, "f3": "text\\." }
+{ "f1": 11, "f2": true, "f3": "text\\." }
+{ "f1": null, "f2": false, "f3": "\\.text" }
+{ "f1": 13, "f2": true, "f3": "\\.text" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.6.adm
new file mode 100644
index 0000000..9a1d1c9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.6.adm
@@ -0,0 +1,3 @@
+{ "f1": "1", "f2": "", "f3": "good", "f4": "recommend" }
+{ "f1": "2", "f2": "", "f3": "bad", "f4": "not recommend" }
+{ "f1": "3", "f2": "", "f3": "good", "f4": "" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.7.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.7.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/tsv-parser-001/tsv-parser-001.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/tsv-parser-001/tsv-parser-001.2.adm
new file mode 100644
index 0000000..fbe287b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/tsv-parser-001/tsv-parser-001.2.adm
@@ -0,0 +1,28 @@
+{ "f1": 11, "f2": 55, "f3": "text field wih , charrrrrrrrrrr", "f4": true, "f5": 90, "f6": 0.666666667 }
+{ "f1": 12, "f2": 55, "f3": "text field with \" charrrrrrrrrr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 14, "f2": 55, "f3": "text field with ' charrrrrrrrrr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 15, "f2": 55, "f3": "text field with \\ charrrrrrrrrr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 16, "f2": 55, "f3": "text field wih \\, char         ", "f4": true, "f5": 90, "f6": 0.666666667 }
+{ "f1": 17, "f2": 55, "f3": "text field with \\\" charrrrrrrrr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 18, "f2": 55, "f3": "text field with \\' charrrrrrrrr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 19, "f2": 55, "f3": "text field with \\\\ charrrrrrrrr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 20, "f2": 55, "f3": "text field ending with  charr ,", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 21, "f2": 55, "f3": "text field ending with  charr \"", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 22, "f2": 55, "f3": "text field ending with  charr '", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 23, "f2": 55, "f3": "text field ending with  charr \\", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 24, "f2": 55, "f3": "text field ending with charr \\,", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 25, "f2": 55, "f3": "text field ending with charr \\\"", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 26, "f2": 55, "f3": "text field ending with charr \\'", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 27, "f2": 55, "f3": "text field ending with charr \\\\", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 28, "f2": 55, "f3": ",text field starting with charr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 29, "f2": 55, "f3": "\"text field starting with charr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 30, "f2": 55, "f3": "'text field starting with charr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 31, "f2": 55, "f3": "\\text field starting with charr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 32, "f2": 55, "f3": "\\,text field starting with char", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 33, "f2": 55, "f3": "\\\"text field starting with char", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 34, "f2": 55, "f3": "\\'text field starting with char", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 35, "f2": 55, "f3": "\\\\text field starting with char", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 36, "f2": 55, "f3": "\"text field inside   with char\"", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 37, "f2": 55, "f3": "  text field with charrrrrrrrr ", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 38, "f2": 55, "f3": "text field with \"\" charrrrrrrrr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 39, "f2": 55, "f3": "text field \"with\" charrrrrrrrrr", "f4": false, "f5": 90, "f6": 0.666666667 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/001/external_dataset.001.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/001/external_dataset.001.adm
new file mode 100644
index 0000000..93d1b57
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/001/external_dataset.001.adm
@@ -0,0 +1,6 @@
+{ "id": 1, "year": null, "review": "good", "details": "recommend" }
+{ "id": 2, "year": null, "review": "bad", "details": "not recommend" }
+{ "id": 3, "year": null, "review": "good", "details": null }
+{ "id": 4, "year": 2018, "review": "good", "details": "recommend" }
+{ "id": 5, "year": 2018, "review": "", "details": "not recommend" }
+{ "id": 6, "year": 2018, "review": "good", "details": null }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/002/external_dataset.001.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/002/external_dataset.001.adm
new file mode 100644
index 0000000..1954b05
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/002/external_dataset.001.adm
@@ -0,0 +1,6 @@
+{ "id": 1, "year": null, "review": "\"good\"", "details": "\"recommend\"" }
+{ "id": 2, "year": null, "review": "\"bad\"", "details": "\"not recommend\"" }
+{ "id": 3, "year": null, "review": "\"good\"", "details": "\"recommend\"" }
+{ "id": 4, "year": 2018, "review": "\"good\"", "details": "\"recommend\"" }
+{ "id": 5, "year": 2018, "review": "", "details": "\"not recommend\"" }
+{ "id": 6, "year": 2018, "review": "\"good\"", "details": "\"recommend\"" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/list/get-item_03/get-item_03.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/list/get-item_03/get-item_03.3.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/list/get-item_03/get-item_03.1.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/list/get-item_03/get-item_03.3.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/list/get-item_03/get-item_03.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/list/get-item_03/get-item_03.4.adm
new file mode 100644
index 0000000..27ec71f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/list/get-item_03/get-item_03.4.adm
@@ -0,0 +1 @@
+{ "ta1": 3, "ta2": 2, "ta3": 1, "ta4": true, "tb1": 3, "tb2": 2, "tb3": 1, "tb4": true }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_nested/union_nested.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_nested/union_nested.3.adm
new file mode 100644
index 0000000..4737f9e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_nested/union_nested.3.adm
@@ -0,0 +1,138 @@
+{ "agg_sum": 25000000, "two": -1, "four": null, "ten": null, "twenty": null }
+{ "agg_sum": 12502500, "two": -1, "four": -3, "ten": null, "twenty": null }
+{ "agg_sum": 2504500, "two": -1, "four": -3, "ten": -9, "twenty": null }
+{ "agg_sum": 2504500, "two": -1, "four": -3, "ten": -9, "twenty": -19 }
+{ "agg_sum": 2498500, "two": -1, "four": -3, "ten": -7, "twenty": null }
+{ "agg_sum": 2498500, "two": -1, "four": -3, "ten": -7, "twenty": -7 }
+{ "agg_sum": 2502500, "two": -1, "four": -3, "ten": -5, "twenty": null }
+{ "agg_sum": 2502500, "two": -1, "four": -3, "ten": -5, "twenty": -15 }
+{ "agg_sum": 2496500, "two": -1, "four": -3, "ten": -3, "twenty": null }
+{ "agg_sum": 2496500, "two": -1, "four": -3, "ten": -3, "twenty": -3 }
+{ "agg_sum": 2500500, "two": -1, "four": -3, "ten": -1, "twenty": null }
+{ "agg_sum": 2500500, "two": -1, "four": -3, "ten": -1, "twenty": -11 }
+{ "agg_sum": 12497500, "two": -1, "four": -1, "ten": null, "twenty": null }
+{ "agg_sum": 2499500, "two": -1, "four": -1, "ten": -9, "twenty": null }
+{ "agg_sum": 2499500, "two": -1, "four": -1, "ten": -9, "twenty": -9 }
+{ "agg_sum": 2503500, "two": -1, "four": -1, "ten": -7, "twenty": null }
+{ "agg_sum": 2503500, "two": -1, "four": -1, "ten": -7, "twenty": -17 }
+{ "agg_sum": 2497500, "two": -1, "four": -1, "ten": -5, "twenty": null }
+{ "agg_sum": 2497500, "two": -1, "four": -1, "ten": -5, "twenty": -5 }
+{ "agg_sum": 2501500, "two": -1, "four": -1, "ten": -3, "twenty": null }
+{ "agg_sum": 2501500, "two": -1, "four": -1, "ten": -3, "twenty": -13 }
+{ "agg_sum": 2495500, "two": -1, "four": -1, "ten": -1, "twenty": null }
+{ "agg_sum": 2495500, "two": -1, "four": -1, "ten": -1, "twenty": -1 }
+{ "agg_sum": 24995000, "two": 0, "four": null, "ten": null, "twenty": null }
+{ "agg_sum": 24995000, "two": 0, "four": null, "ten": null, "twenty": null }
+{ "agg_sum": 12500000, "two": 0, "four": -2, "ten": null, "twenty": null }
+{ "agg_sum": 2504000, "two": 0, "four": -2, "ten": -8, "twenty": null }
+{ "agg_sum": 2504000, "two": 0, "four": -2, "ten": -8, "twenty": -18 }
+{ "agg_sum": 2498000, "two": 0, "four": -2, "ten": -6, "twenty": null }
+{ "agg_sum": 2498000, "two": 0, "four": -2, "ten": -6, "twenty": -6 }
+{ "agg_sum": 2502000, "two": 0, "four": -2, "ten": -4, "twenty": null }
+{ "agg_sum": 2502000, "two": 0, "four": -2, "ten": -4, "twenty": -14 }
+{ "agg_sum": 2496000, "two": 0, "four": -2, "ten": -2, "twenty": null }
+{ "agg_sum": 2496000, "two": 0, "four": -2, "ten": -2, "twenty": -2 }
+{ "agg_sum": 2500000, "two": 0, "four": -2, "ten": 0, "twenty": null }
+{ "agg_sum": 2500000, "two": 0, "four": -2, "ten": 0, "twenty": -10 }
+{ "agg_sum": 12495000, "two": 0, "four": 0, "ten": null, "twenty": null }
+{ "agg_sum": 12495000, "two": 0, "four": 0, "ten": null, "twenty": null }
+{ "agg_sum": 2499000, "two": 0, "four": 0, "ten": -8, "twenty": null }
+{ "agg_sum": 2499000, "two": 0, "four": 0, "ten": -8, "twenty": -8 }
+{ "agg_sum": 2503000, "two": 0, "four": 0, "ten": -6, "twenty": null }
+{ "agg_sum": 2503000, "two": 0, "four": 0, "ten": -6, "twenty": -16 }
+{ "agg_sum": 2497000, "two": 0, "four": 0, "ten": -4, "twenty": null }
+{ "agg_sum": 2497000, "two": 0, "four": 0, "ten": -4, "twenty": -4 }
+{ "agg_sum": 2501000, "two": 0, "four": 0, "ten": -2, "twenty": null }
+{ "agg_sum": 2501000, "two": 0, "four": 0, "ten": -2, "twenty": -12 }
+{ "agg_sum": 2495000, "two": 0, "four": 0, "ten": 0, "twenty": null }
+{ "agg_sum": 2495000, "two": 0, "four": 0, "ten": 0, "twenty": null }
+{ "agg_sum": 2495000, "two": 0, "four": 0, "ten": 0, "twenty": 0 }
+{ "agg_sum": 2495000, "two": 0, "four": 0, "ten": 0, "twenty": 0 }
+{ "agg_sum": 2501000, "two": 0, "four": 0, "ten": 2, "twenty": null }
+{ "agg_sum": 2501000, "two": 0, "four": 0, "ten": 2, "twenty": 12 }
+{ "agg_sum": 2497000, "two": 0, "four": 0, "ten": 4, "twenty": null }
+{ "agg_sum": 2497000, "two": 0, "four": 0, "ten": 4, "twenty": 4 }
+{ "agg_sum": 2503000, "two": 0, "four": 0, "ten": 6, "twenty": null }
+{ "agg_sum": 2503000, "two": 0, "four": 0, "ten": 6, "twenty": 16 }
+{ "agg_sum": 2499000, "two": 0, "four": 0, "ten": 8, "twenty": null }
+{ "agg_sum": 2499000, "two": 0, "four": 0, "ten": 8, "twenty": 8 }
+{ "agg_sum": 12500000, "two": 0, "four": 2, "ten": null, "twenty": null }
+{ "agg_sum": 2500000, "two": 0, "four": 2, "ten": 0, "twenty": null }
+{ "agg_sum": 2500000, "two": 0, "four": 2, "ten": 0, "twenty": 10 }
+{ "agg_sum": 2496000, "two": 0, "four": 2, "ten": 2, "twenty": null }
+{ "agg_sum": 2496000, "two": 0, "four": 2, "ten": 2, "twenty": 2 }
+{ "agg_sum": 2502000, "two": 0, "four": 2, "ten": 4, "twenty": null }
+{ "agg_sum": 2502000, "two": 0, "four": 2, "ten": 4, "twenty": 14 }
+{ "agg_sum": 2498000, "two": 0, "four": 2, "ten": 6, "twenty": null }
+{ "agg_sum": 2498000, "two": 0, "four": 2, "ten": 6, "twenty": 6 }
+{ "agg_sum": 2504000, "two": 0, "four": 2, "ten": 8, "twenty": null }
+{ "agg_sum": 2504000, "two": 0, "four": 2, "ten": 8, "twenty": 18 }
+{ "agg_sum": 25000000, "two": 1, "four": null, "ten": null, "twenty": null }
+{ "agg_sum": 12497500, "two": 1, "four": 1, "ten": null, "twenty": null }
+{ "agg_sum": 2495500, "two": 1, "four": 1, "ten": 1, "twenty": null }
+{ "agg_sum": 2495500, "two": 1, "four": 1, "ten": 1, "twenty": 1 }
+{ "agg_sum": 2501500, "two": 1, "four": 1, "ten": 3, "twenty": null }
+{ "agg_sum": 2501500, "two": 1, "four": 1, "ten": 3, "twenty": 13 }
+{ "agg_sum": 2497500, "two": 1, "four": 1, "ten": 5, "twenty": null }
+{ "agg_sum": 2497500, "two": 1, "four": 1, "ten": 5, "twenty": 5 }
+{ "agg_sum": 2503500, "two": 1, "four": 1, "ten": 7, "twenty": null }
+{ "agg_sum": 2503500, "two": 1, "four": 1, "ten": 7, "twenty": 17 }
+{ "agg_sum": 2499500, "two": 1, "four": 1, "ten": 9, "twenty": null }
+{ "agg_sum": 2499500, "two": 1, "four": 1, "ten": 9, "twenty": 9 }
+{ "agg_sum": 12502500, "two": 1, "four": 3, "ten": null, "twenty": null }
+{ "agg_sum": 2500500, "two": 1, "four": 3, "ten": 1, "twenty": null }
+{ "agg_sum": 2500500, "two": 1, "four": 3, "ten": 1, "twenty": 11 }
+{ "agg_sum": 2496500, "two": 1, "four": 3, "ten": 3, "twenty": null }
+{ "agg_sum": 2496500, "two": 1, "four": 3, "ten": 3, "twenty": 3 }
+{ "agg_sum": 2502500, "two": 1, "four": 3, "ten": 5, "twenty": null }
+{ "agg_sum": 2502500, "two": 1, "four": 3, "ten": 5, "twenty": 15 }
+{ "agg_sum": 2498500, "two": 1, "four": 3, "ten": 7, "twenty": null }
+{ "agg_sum": 2498500, "two": 1, "four": 3, "ten": 7, "twenty": 7 }
+{ "agg_sum": 2504500, "two": 1, "four": 3, "ten": 9, "twenty": null }
+{ "agg_sum": 2504500, "two": 1, "four": 3, "ten": 9, "twenty": 19 }
+{ "agg_sum": 24995000, "two": "0", "four": null, "ten": null, "twenty": null }
+{ "agg_sum": 12495000, "two": "0", "four": "0", "ten": null, "twenty": null }
+{ "agg_sum": 2495000, "two": "0", "four": "0", "ten": "0", "twenty": null }
+{ "agg_sum": 2495000, "two": "0", "four": "0", "ten": "0", "twenty": "0" }
+{ "agg_sum": 2501000, "two": "0", "four": "0", "ten": "2", "twenty": null }
+{ "agg_sum": 2501000, "two": "0", "four": "0", "ten": "2", "twenty": "12" }
+{ "agg_sum": 2497000, "two": "0", "four": "0", "ten": "4", "twenty": null }
+{ "agg_sum": 2497000, "two": "0", "four": "0", "ten": "4", "twenty": "4" }
+{ "agg_sum": 2503000, "two": "0", "four": "0", "ten": "6", "twenty": null }
+{ "agg_sum": 2503000, "two": "0", "four": "0", "ten": "6", "twenty": "16" }
+{ "agg_sum": 2499000, "two": "0", "four": "0", "ten": "8", "twenty": null }
+{ "agg_sum": 2499000, "two": "0", "four": "0", "ten": "8", "twenty": "8" }
+{ "agg_sum": 12500000, "two": "0", "four": "2", "ten": null, "twenty": null }
+{ "agg_sum": 2500000, "two": "0", "four": "2", "ten": "0", "twenty": null }
+{ "agg_sum": 2500000, "two": "0", "four": "2", "ten": "0", "twenty": "10" }
+{ "agg_sum": 2496000, "two": "0", "four": "2", "ten": "2", "twenty": null }
+{ "agg_sum": 2496000, "two": "0", "four": "2", "ten": "2", "twenty": "2" }
+{ "agg_sum": 2502000, "two": "0", "four": "2", "ten": "4", "twenty": null }
+{ "agg_sum": 2502000, "two": "0", "four": "2", "ten": "4", "twenty": "14" }
+{ "agg_sum": 2498000, "two": "0", "four": "2", "ten": "6", "twenty": null }
+{ "agg_sum": 2498000, "two": "0", "four": "2", "ten": "6", "twenty": "6" }
+{ "agg_sum": 2504000, "two": "0", "four": "2", "ten": "8", "twenty": null }
+{ "agg_sum": 2504000, "two": "0", "four": "2", "ten": "8", "twenty": "18" }
+{ "agg_sum": 25000000, "two": "1", "four": null, "ten": null, "twenty": null }
+{ "agg_sum": 12497500, "two": "1", "four": "1", "ten": null, "twenty": null }
+{ "agg_sum": 2495500, "two": "1", "four": "1", "ten": "1", "twenty": null }
+{ "agg_sum": 2495500, "two": "1", "four": "1", "ten": "1", "twenty": "1" }
+{ "agg_sum": 2501500, "two": "1", "four": "1", "ten": "3", "twenty": null }
+{ "agg_sum": 2501500, "two": "1", "four": "1", "ten": "3", "twenty": "13" }
+{ "agg_sum": 2497500, "two": "1", "four": "1", "ten": "5", "twenty": null }
+{ "agg_sum": 2497500, "two": "1", "four": "1", "ten": "5", "twenty": "5" }
+{ "agg_sum": 2503500, "two": "1", "four": "1", "ten": "7", "twenty": null }
+{ "agg_sum": 2503500, "two": "1", "four": "1", "ten": "7", "twenty": "17" }
+{ "agg_sum": 2499500, "two": "1", "four": "1", "ten": "9", "twenty": null }
+{ "agg_sum": 2499500, "two": "1", "four": "1", "ten": "9", "twenty": "9" }
+{ "agg_sum": 12502500, "two": "1", "four": "3", "ten": null, "twenty": null }
+{ "agg_sum": 2500500, "two": "1", "four": "3", "ten": "1", "twenty": null }
+{ "agg_sum": 2500500, "two": "1", "four": "3", "ten": "1", "twenty": "11" }
+{ "agg_sum": 2496500, "two": "1", "four": "3", "ten": "3", "twenty": null }
+{ "agg_sum": 2496500, "two": "1", "four": "3", "ten": "3", "twenty": "3" }
+{ "agg_sum": 2502500, "two": "1", "four": "3", "ten": "5", "twenty": null }
+{ "agg_sum": 2502500, "two": "1", "four": "3", "ten": "5", "twenty": "15" }
+{ "agg_sum": 2498500, "two": "1", "four": "3", "ten": "7", "twenty": null }
+{ "agg_sum": 2498500, "two": "1", "four": "3", "ten": "7", "twenty": "7" }
+{ "agg_sum": 2504500, "two": "1", "four": "3", "ten": "9", "twenty": null }
+{ "agg_sum": 2504500, "two": "1", "four": "3", "ten": "9", "twenty": "19" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/numeric/add_double/add_double.1.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/numeric/add_double/add_double.1.ast
index 4550cfe..7e1443c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/numeric/add_double/add_double.1.ast
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/numeric/add_double/add_double.1.ast
@@ -97,12 +97,7 @@
         LiteralExpr [STRING] [-6.5d]
       ]
       +
-      IndexAccessor [
-        OrderedListConstructor [
-          LiteralExpr [DOUBLE] [1.0]
-        ]
-        Index:         - LiteralExpr [LONG] [1]
-      ]
+      LiteralExpr [MISSING]
     ]
   )
 ]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
index cd1fb12..2456f13 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
@@ -17,12 +17,28 @@
  ! specific language governing permissions and limitations
  ! under the License.
  !-->
-<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp">
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
   <test-group name="external-dataset">
     <test-case FilePath="external-dataset">
       <compilation-unit name="aws/s3/000">
         <output-dir compare="Text">aws/s3/000</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="aws/s3/001">
+        <output-dir compare="Text">aws/s3/001</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="aws/s3/002">
+        <output-dir compare="Text">aws/s3/002</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="aws/s3/negative">
+        <output-dir compare="Text">aws/s3/negative</output-dir>
+        <expected-error>Parameter(s) format must be specified</expected-error>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index ec6c24f..bcaa397 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -13141,6 +13141,18 @@
       </compilation-unit>
     </test-case>
   </test-group>
+  <test-group name="csv-tsv-parser">
+    <test-case FilePath="csv-tsv-parser">
+      <compilation-unit name="csv-parser-001">
+        <output-dir compare="Text">csv-parser-001</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="csv-tsv-parser">
+      <compilation-unit name="tsv-parser-001">
+        <output-dir compare="Text">tsv-parser-001</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
   <test-group name="binary">
     <test-case FilePath="binary">
       <compilation-unit name="parse">
@@ -13234,6 +13246,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="union">
+      <compilation-unit name="union_nested">
+        <output-dir compare="Text">union_nested</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="union">
       <compilation-unit name="union_orderby">
         <output-dir compare="Text">union_orderby</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 7fef69e..4e10498 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -248,7 +248,7 @@
     public static final int LIBRARY_EXTERNAL_FUNCTION_UNSUPPORTED_NAME = 3047;
     public static final int OPERATORS_FEED_META_OPERATOR_DESCRIPTOR_INVALID_RUNTIME = 3048;
     public static final int PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_DELIMITER = 3049;
-    public static final int PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_QUOTE = 3050;
+    public static final int PARSER_INVALID_CHAR_LENGTH = 3050;
     public static final int PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_QUOTE_DELIMITER_MISMATCH = 3051;
     public static final int INDEXING_EXTERNAL_FILE_INDEX_ACCESSOR_UNABLE_TO_FIND_FILE_INDEX = 3052;
     public static final int PARSER_ADM_DATA_PARSER_FIELD_NOT_NULL = 3053;
@@ -314,7 +314,9 @@
     public static final int FAILED_TO_PARSE_METADATA = 3115;
     public static final int INPUT_DECODE_FAILURE = 3116;
     public static final int FAILED_TO_PARSE_MALFORMED_LOG_RECORD = 3117;
-    public static final int METADATA_DROP_LIBRARY_IN_USE = 3118;
+    public static final int PARAMETERS_REQUIRED = 3118;
+    public static final int MALFORMED_RECORD = 3119;
+    public static final int METADATA_DROP_LIBRARY_IN_USE = 3120;
 
     // Lifecycle management errors
     public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 4461ff1..48d849f 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -246,7 +246,7 @@
 3047 = External %1$s not supported
 3048 = Invalid feed runtime: %1$s
 3049 = '%1$s' is not a valid delimiter. The length of a delimiter should be 1
-3050 = '%1$s' is not a valid quote. The length of a quote should be 1
+3050 = '%1$s' is not a valid %2$s. The length of %2$s should be 1
 3051 = Quote '%1$s' cannot be used with the delimiter '%2$s'
 3052 = Was not able to find a file in the files index
 3053 = Field %1$s can not be null
@@ -312,7 +312,9 @@
 3115 = Failed to parse record metadata
 3116 = Failed to decode input
 3117 = Failed to parse record, malformed log record
-3118 = Library %1$s is being used. It cannot be dropped
+3118 = Parameter(s) %1$s must be specified
+3119 = Record number %1$s is malformed
+3120 = Library %1$s is being used. It cannot be dropped
 
 # Lifecycle management errors
 4000 = Partition id %1$s for node %2$s already in use by node %3$s
diff --git a/asterixdb/asterix-doc/src/main/markdown/sqlpp/2_expr.md b/asterixdb/asterix-doc/src/main/markdown/sqlpp/2_expr.md
index 37e7f79..2e0b526 100644
--- a/asterixdb/asterix-doc/src/main/markdown/sqlpp/2_expr.md
+++ b/asterixdb/asterix-doc/src/main/markdown/sqlpp/2_expr.md
@@ -218,8 +218,9 @@
 single element from an array, or a whole subset of an array. Accessing a single element is achieved by
 providing a single index argument (zero-based element position), while obtaining a subset of an array is achieved by
 providing the `start` and `end` (zero-based) index positions; the returned subset is from position `start` to position
-`end - 1`; the `end` position argument is optional. Multisets have similar behavior to arrays, except for retrieving
-arbitrary items as the order of items is not fixed in multisets.
+`end - 1`; the `end` position argument is optional. If a position argument is negative then the element position is
+counted from the end of the array (`-1` addresses the last element, `-2` next to last, and so on). Multisets have
+similar behavior to arrays, except for retrieving arbitrary items as the order of items is not fixed in multisets.
 
 Attempts to access non-existent fields or out-of-bound array elements produce the special value `MISSING`. Type errors
 will be raised for inappropriate use of a path expression, such as applying a field accessor to a numeric value.
@@ -232,12 +233,16 @@
     ({"name": "MyABCs", "array": [ "a", "b", "c"]}).array
 
     (["a", "b", "c"])[2]
+    
+    (["a", "b", "c"])[-1]
 
     ({"name": "MyABCs", "array": [ "a", "b", "c"]}).array[2]
 
     (["a", "b", "c"])[0:2]
 
     (["a", "b", "c"])[0:]
+    
+    (["a", "b", "c"])[-2:-1]
 
 
 ## <a id="Primary_expressions">Primary Expressions</a>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index f830376..7702dde 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -205,7 +205,7 @@
             IExternalIndexer indexer = files == null ? null : ExternalIndexerProvider.getIndexer(configuration);
             if (recordReaderClazz != null) {
                 StreamRecordReader streamReader = (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
-                streamReader.configure(createInputStream(ctx, partition, indexer), configuration);
+                streamReader.configure(ctx, createInputStream(ctx, partition, indexer), configuration);
                 if (indexer != null) {
                     return new IndexingStreamRecordReader(streamReader, indexer);
                 } else {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
index 65ecd8d..aa4abb4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
@@ -134,4 +134,10 @@
         strValue.getChars(0, strValue.length(), value, 0);
         this.size = strValue.length();
     }
+
+    public boolean isEmptyRecord() {
+        return size <= 0
+                || (size == 1 && (value[0] == ExternalDataConstants.LF || value[0] == ExternalDataConstants.CR))
+                || (size == 2 && value[0] == ExternalDataConstants.CR && value[1] == ExternalDataConstants.LF);
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
index 8255ebb..5c8f219 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
@@ -41,7 +41,7 @@
         this.cursor = new FieldCursorForDelimitedDataParser(null, delimiter, ExternalDataConstants.QUOTE);
         this.record = new CharArrayRecord();
         this.valueIndex = valueIndex;
-        this.recordWithMetadata = new RecordWithMetadataAndPK<char[]>(record, metaType.getFieldTypes(), recordType,
+        this.recordWithMetadata = new RecordWithMetadataAndPK<>(record, metaType.getFieldTypes(), recordType,
                 keyIndicator, keyIndexes, keyTypes);
     }
 
@@ -53,16 +53,15 @@
         int i = 0;
         int j = 0;
         while (cursor.nextField()) {
-            if (cursor.isDoubleQuoteIncludedInThisField) {
-                cursor.eliminateDoubleQuote(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
-                cursor.fEnd -= cursor.doubleQuoteCount;
-                cursor.isDoubleQuoteIncludedInThisField = false;
+            if (cursor.fieldHasDoubleQuote()) {
+                cursor.eliminateDoubleQuote();
             }
             if (i == valueIndex) {
-                record.setValue(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
+                record.setValue(cursor.getBuffer(), cursor.getFieldStart(), cursor.getFieldLength());
                 record.endRecord();
             } else {
-                recordWithMetadata.setRawMetadata(j, cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
+                recordWithMetadata.setRawMetadata(j, cursor.getBuffer(), cursor.getFieldStart(),
+                        cursor.getFieldLength());
                 j++;
             }
             i++;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index a9f7898..6b8bb59 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -27,6 +27,7 @@
 import java.util.Map;
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IInputStreamFactory;
@@ -35,7 +36,6 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
@@ -77,8 +77,7 @@
     }
 
     @Override
-    public void configure(IServiceContext ctx, Map<String, String> configuration)
-            throws AlgebricksException, HyracksDataException {
+    public void configure(IServiceContext ctx, Map<String, String> configuration) throws AlgebricksException {
         this.configuration = configuration;
         ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext();
 
@@ -115,13 +114,13 @@
      *
      * @return A list of string paths that point to files only
      *
-     * @throws HyracksDataException HyracksDataException
+     * @throws AsterixException AsterixException
      */
-    private List<S3Object> getFilesOnly(List<S3Object> s3Objects, String fileFormat) throws HyracksDataException {
+    private List<S3Object> getFilesOnly(List<S3Object> s3Objects, String fileFormat) throws AsterixException {
         List<S3Object> filesOnly = new ArrayList<>();
         String fileExtension = getFileExtension(fileFormat);
         if (fileExtension == null) {
-            throw HyracksDataException.create(ErrorCode.INVALID_FORMAT);
+            throw AsterixException.create(ErrorCode.PROVIDER_STREAM_RECORD_READER_UNKNOWN_FORMAT, fileFormat);
         }
 
         s3Objects.stream().filter(object -> object.key().endsWith(fileExtension)).forEach(filesOnly::add);
@@ -214,8 +213,12 @@
      */
     private String getFileExtension(String format) {
         switch (format.toLowerCase()) {
-            case "json":
+            case ExternalDataConstants.FORMAT_JSON_LOWER_CASE:
                 return ".json";
+            case ExternalDataConstants.FORMAT_CSV:
+                return ".csv";
+            case ExternalDataConstants.FORMAT_TSV:
+                return ".tsv";
             default:
                 return null;
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
index e78783a..6484d4e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
@@ -18,20 +18,16 @@
  */
 package org.apache.asterix.external.input.record.reader.aws;
 
-import java.lang.reflect.InvocationTargetException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
 import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
 import org.apache.asterix.external.provider.StreamRecordReaderProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class AwsS3ReaderFactory extends StreamRecordReaderFactory {
@@ -73,18 +69,4 @@
         // record reader
         recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
     }
-
-    @Override
-    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException {
-        try {
-            StreamRecordReader streamRecordReader =
-                    (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
-            streamRecordReader.configure(streamFactory.createInputStream(ctx, partition), configuration);
-            return streamRecordReader;
-        } catch (InstantiationException | IllegalAccessException | InvocationTargetException
-                | NoSuchMethodException e) {
-            throw HyracksDataException.create(e);
-        }
-    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
index 07b6250..24a68a7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
@@ -26,6 +26,7 @@
 
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class EmptyLineSeparatedRecordReader extends StreamRecordReader {
 
@@ -135,7 +136,7 @@
     }
 
     @Override
-    public void configure(AsterixInputStream inputStream, Map<String, String> config) {
+    public void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config) {
         super.configure(inputStream);
         this.config = config;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index 0b41d4b..a27397e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class LineRecordReader extends StreamRecordReader {
@@ -36,15 +37,18 @@
     protected int newlineLength;
     protected int recordNumber = 0;
     protected boolean nextIsHeader = false;
-    private static final List<String> recordReaderFormats = Collections.unmodifiableList(
-            Arrays.asList(ExternalDataConstants.FORMAT_DELIMITED_TEXT, ExternalDataConstants.FORMAT_CSV));
+    private static final List<String> recordReaderFormats =
+            Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_DELIMITED_TEXT,
+                    ExternalDataConstants.FORMAT_CSV, ExternalDataConstants.FORMAT_TSV));
     private static final String REQUIRED_CONFIGS = "";
 
     @Override
-    public void configure(AsterixInputStream inputStream, Map<String, String> config) throws HyracksDataException {
+    public void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config)
+            throws HyracksDataException {
         super.configure(inputStream);
         this.hasHeader = ExternalDataUtils.hasHeader(config);
         if (hasHeader) {
+            // TODO(ali): revisit this and notifyNewSource
             inputStream.setNotificationHandler(this);
         }
     }
@@ -99,13 +103,16 @@
                     startPosn = bufferPosn = 0;
                     bufferLength = reader.read(inputBuffer);
                     if (bufferLength <= 0) {
-                        if (readLength > 0) {
-                            record.endRecord();
-                            recordNumber++;
-                            return true;
+                        if (readLength <= 0) {
+                            close();
+                            return false; //EOF
                         }
-                        close();
-                        return false; //EOF
+                        record.endRecord();
+                        if (record.isEmptyRecord()) {
+                            return false;
+                        }
+                        recordNumber++;
+                        return true;
                     }
                 }
                 for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
@@ -129,6 +136,9 @@
                     record.append(inputBuffer, startPosn, readLength);
                 }
             } while (newlineLength == 0);
+            if (record.isEmptyRecord()) {
+                continue;
+            }
             if (nextIsHeader) {
                 nextIsHeader = false;
                 continue;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index 4c4128a..564df4b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -24,29 +24,35 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.WarningUtil;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class QuotedLineRecordReader extends LineRecordReader {
 
     private char quote;
-    private boolean prevCharEscape;
-    private boolean inQuote;
+    private char quoteEscape;
+    private IWarningCollector warningCollector;
+    private final SourceLocation srcLoc = new SourceLocation(-1, -1);
     private static final List<String> recordReaderFormats = Collections.unmodifiableList(
             Arrays.asList(ExternalDataConstants.FORMAT_DELIMITED_TEXT, ExternalDataConstants.FORMAT_CSV));
-    private static final String REQUIRED_CONFIGS = "quote";
+    private static final String REQUIRED_CONFIGS = ExternalDataConstants.KEY_QUOTE;
 
     @Override
-    public void configure(AsterixInputStream inputStream, Map<String, String> config) throws HyracksDataException {
-        super.configure(inputStream, config);
+    public void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config)
+            throws HyracksDataException {
+        super.configure(ctx, inputStream, config);
+        this.warningCollector = ctx.getWarningCollector();
         String quoteString = config.get(ExternalDataConstants.KEY_QUOTE);
-        if (quoteString.length() != 1) {
-            throw new HyracksDataException(ExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_QUOTE,
-                    ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
-        }
+        ExternalDataUtils.validateQuote(quoteString);
         this.quote = quoteString.charAt(0);
+        this.quoteEscape = ExternalDataUtils.validateGetQuoteEscape(config);
     }
 
     @Override
@@ -67,31 +73,41 @@
             }
             newlineLength = 0;
             prevCharCR = false;
-            prevCharEscape = false;
+            boolean prevCharEscape = false;
             record.reset();
             int readLength = 0;
-            inQuote = false;
+            boolean inQuote = false;
             do {
                 int startPosn = bufferPosn;
                 if (bufferPosn >= bufferLength) {
                     startPosn = bufferPosn = 0;
                     bufferLength = reader.read(inputBuffer);
                     if (bufferLength <= 0) {
-                        {
-                            if (readLength > 0) {
-                                if (inQuote) {
-                                    throw new IOException("malformed input record ended inside quote");
-                                }
-                                record.endRecord();
-                                recordNumber++;
-                                return true;
+                        // reached end of stream
+                        if (readLength <= 0 || inQuote) {
+                            // haven't read anything previously OR have read and in the middle and hit the end
+                            if (inQuote && warningCollector.shouldWarn()) {
+                                warningCollector
+                                        .warn(WarningUtil.forAsterix(srcLoc, ErrorCode.MALFORMED_RECORD, recordNumber));
                             }
                             close();
                             return false;
                         }
+                        record.endRecord();
+                        if (record.isEmptyRecord()) {
+                            return false;
+                        }
+                        recordNumber++;
+                        return true;
                     }
                 }
+                boolean maybeInQuote = false;
                 for (; bufferPosn < bufferLength; ++bufferPosn) {
+                    if (inputBuffer[bufferPosn] == quote && quoteEscape == quote) {
+                        inQuote |= maybeInQuote;
+                        prevCharEscape |= maybeInQuote;
+                    }
+                    maybeInQuote = false;
                     if (!inQuote) {
                         if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
                             newlineLength = (prevCharCR) ? 2 : 1;
@@ -103,24 +119,22 @@
                             break;
                         }
                         prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
-                        if (inputBuffer[bufferPosn] == quote) {
-                            if (!prevCharEscape) {
-                                inQuote = true;
-                            }
+                        if (inputBuffer[bufferPosn] == quote && !prevCharEscape) {
+                            // this is an opening quote
+                            inQuote = true;
                         }
-                        if (prevCharEscape) {
-                            prevCharEscape = false;
-                        } else {
-                            prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
-                        }
+                        // the quoteEscape != quote is for making an opening quote not an escape
+                        prevCharEscape =
+                                inputBuffer[bufferPosn] == quoteEscape && !prevCharEscape && quoteEscape != quote;
                     } else {
-                        // only look for next quote
-                        if (inputBuffer[bufferPosn] == quote) {
-                            if (!prevCharEscape) {
-                                inQuote = false;
-                            }
+                        // if quote == quoteEscape and current char is quote, then it could be closing or escaping
+                        if (inputBuffer[bufferPosn] == quote && !prevCharEscape) {
+                            // this is most likely a closing quote. the outcome depends on the next char
+                            inQuote = false;
+                            maybeInQuote = true;
                         }
-                        prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+                        prevCharEscape =
+                                inputBuffer[bufferPosn] == quoteEscape && !prevCharEscape && quoteEscape != quote;
                     }
                 }
                 readLength = bufferPosn - startPosn;
@@ -131,6 +145,9 @@
                     record.append(inputBuffer, startPosn, readLength);
                 }
             } while (newlineLength == 0);
+            if (record.isEmptyRecord()) {
+                continue;
+            }
             if (nextIsHeader) {
                 nextIsHeader = false;
                 continue;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 883f0ee..5ab5730 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -29,6 +29,7 @@
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class SemiStructuredRecordReader extends StreamRecordReader {
@@ -45,7 +46,8 @@
     private static final String REQUIRED_CONFIGS = "";
 
     @Override
-    public void configure(AsterixInputStream stream, Map<String, String> config) throws HyracksDataException {
+    public void configure(IHyracksTaskContext ctx, AsterixInputStream stream, Map<String, String> config)
+            throws HyracksDataException {
         super.configure(stream);
         String recStartString = config.get(ExternalDataConstants.KEY_RECORD_START);
         String recEndString = config.get(ExternalDataConstants.KEY_RECORD_END);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
index 5629f48..4aed741 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
@@ -31,6 +31,7 @@
 import org.apache.asterix.external.input.stream.AsterixInputStreamReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class StreamRecordReader implements IRecordReader<char[]>, IStreamNotificationHandler {
@@ -100,6 +101,6 @@
 
     public abstract String getRequiredConfigs();
 
-    public abstract void configure(AsterixInputStream inputStream, Map<String, String> config)
+    public abstract void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config)
             throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index 076842e..9bb50f6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -95,7 +95,7 @@
         try {
             StreamRecordReader streamRecordReader =
                     (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
-            streamRecordReader.configure(streamFactory.createInputStream(ctx, partition), configuration);
+            streamRecordReader.configure(ctx, streamFactory.createInputStream(ctx, partition), configuration);
             return streamRecordReader;
         } catch (InstantiationException | IllegalAccessException | InvocationTargetException
                 | NoSuchMethodException e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
index 8e166c0..d35ad26 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
@@ -68,7 +68,7 @@
     }
 
     @Override
-    public int read(char cbuf[], int offset, int length) throws IOException {
+    public int read(char[] cbuf, int offset, int length) throws IOException {
         if (done) {
             return -1;
         }
@@ -84,7 +84,9 @@
                 } else {
                     // need to read more data
                     System.arraycopy(bytes, byteBuffer.position(), bytes, 0, byteBuffer.remaining());
+                    len = 0; // reset to read more bytes
                     byteBuffer.position(byteBuffer.remaining());
+                    byteBuffer.limit(byteBuffer.capacity()); //set limit to capacity for the new bytes
                     while (len == 0) {
                         len = in.read(bytes, byteBuffer.position(), bytes.length - byteBuffer.position());
                     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
index fb9a4a6..0fe81cd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
@@ -137,6 +137,11 @@
 
     @Override
     public boolean handleException(Throwable th) {
+        if (th instanceof RuntimeException) {
+            //Log runtime exception
+            //TODO Should we continue anyway?
+            LOGGER.error("Encountered an unexpected error", th);
+        }
         try {
             return accept();
         } catch (IOException e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
index 4e371c8..505acbd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -35,6 +35,7 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
@@ -43,23 +44,24 @@
 
 public class DelimitedDataParser extends AbstractDataParser implements IStreamDataParser, IRecordDataParser<char[]> {
 
+    private final IHyracksTaskContext ctx;
     private final char fieldDelimiter;
     private final char quote;
     private final boolean hasHeader;
-    private ARecordType recordType;
-    private IARecordBuilder recBuilder;
-    private ArrayBackedValueStorage fieldValueBuffer;
-    private DataOutput fieldValueBufferOutput;
-    private IValueParser[] valueParsers;
+    private final ARecordType recordType;
+    private final IARecordBuilder recBuilder;
+    private final ArrayBackedValueStorage fieldValueBuffer;
+    private final DataOutput fieldValueBufferOutput;
+    private final IValueParser[] valueParsers;
     private FieldCursorForDelimitedDataParser cursor;
-    private byte[] fieldTypeTags;
-    private int[] fldIds;
-    private ArrayBackedValueStorage[] nameBuffers;
-    private boolean areAllNullFields;
+    private final byte[] fieldTypeTags;
+    private final int[] fldIds;
+    private final ArrayBackedValueStorage[] nameBuffers;
 
-    public DelimitedDataParser(IValueParserFactory[] valueParserFactories, char fieldDelimter, char quote,
-            boolean hasHeader, ARecordType recordType, boolean isStreamParser) throws HyracksDataException {
-        this.fieldDelimiter = fieldDelimter;
+    public DelimitedDataParser(IHyracksTaskContext ctx, IValueParserFactory[] valueParserFactories, char fieldDelimiter,
+            char quote, boolean hasHeader, ARecordType recordType, boolean isStreamParser) throws HyracksDataException {
+        this.ctx = ctx;
+        this.fieldDelimiter = fieldDelimiter;
         this.quote = quote;
         this.hasHeader = hasHeader;
         this.recordType = recordType;
@@ -98,7 +100,7 @@
             }
         }
         if (!isStreamParser) {
-            cursor = new FieldCursorForDelimitedDataParser(null, fieldDelimiter, quote);
+            cursor = new FieldCursorForDelimitedDataParser(null, this.fieldDelimiter, quote);
         }
     }
 
@@ -107,10 +109,8 @@
         try {
             while (cursor.nextRecord()) {
                 parseRecord();
-                if (!areAllNullFields) {
-                    recBuilder.write(out, true);
-                    return true;
-                }
+                recBuilder.write(out, true);
+                return true;
             }
             return false;
         } catch (IOException e) {
@@ -121,7 +121,6 @@
     private void parseRecord() throws HyracksDataException {
         recBuilder.reset(recordType);
         recBuilder.init();
-        areAllNullFields = true;
 
         for (int i = 0; i < valueParsers.length; ++i) {
             try {
@@ -134,27 +133,24 @@
             fieldValueBuffer.reset();
 
             try {
-                if (cursor.fStart == cursor.fEnd && recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.STRING
+                if (cursor.isFieldEmpty() && recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.STRING
                         && recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.NULL) {
                     // if the field is empty and the type is optional, insert
                     // NULL. Note that string type can also process empty field as an
                     // empty string
                     if (!NonTaggedFormatUtil.isOptional(recordType.getFieldTypes()[i])) {
-                        throw new RuntimeDataException(ErrorCode.PARSER_DELIMITED_NONOPTIONAL_NULL, cursor.recordCount,
-                                cursor.fieldCount);
+                        throw new RuntimeDataException(ErrorCode.PARSER_DELIMITED_NONOPTIONAL_NULL,
+                                cursor.getRecordCount(), cursor.getFieldCount());
                     }
                     fieldValueBufferOutput.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
                 } else {
                     fieldValueBufferOutput.writeByte(fieldTypeTags[i]);
-                    // Eliminate doule quotes in the field that we are going to parse
-                    if (cursor.isDoubleQuoteIncludedInThisField) {
-                        cursor.eliminateDoubleQuote(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
-                        cursor.fEnd -= cursor.doubleQuoteCount;
-                        cursor.isDoubleQuoteIncludedInThisField = false;
+                    // Eliminate double quotes in the field that we are going to parse
+                    if (cursor.fieldHasDoubleQuote()) {
+                        cursor.eliminateDoubleQuote();
                     }
-                    valueParsers[i].parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart,
+                    valueParsers[i].parse(cursor.getBuffer(), cursor.getFieldStart(), cursor.getFieldLength(),
                             fieldValueBufferOutput);
-                    areAllNullFields = false;
                 }
                 if (fldIds[i] < 0) {
                     recBuilder.addField(nameBuffers[i], fieldValueBuffer);
@@ -165,19 +161,16 @@
                 throw HyracksDataException.create(e);
             }
         }
+        if (valueParsers.length != cursor.getFieldCount()) {
+            throw new HyracksDataException("Record #" + cursor.getRecordCount() + " is missing some fields");
+        }
     }
 
     @Override
     public void parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
-        try {
-            cursor.nextRecord(record.get(), record.size());
-        } catch (IOException e) {
-            throw HyracksDataException.create(e);
-        }
+        cursor.nextRecord(record.get(), record.size());
         parseRecord();
-        if (!areAllNullFields) {
-            recBuilder.write(out, true);
-        }
+        recBuilder.write(out, true);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
index f406729..46c5152 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
@@ -21,10 +21,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IStreamDataParser;
@@ -40,19 +37,20 @@
 
     private static final long serialVersionUID = 1L;
     private static final List<String> parserFormats =
-            Collections.unmodifiableList(Arrays.asList("csv", "delimited-text"));
+            Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_CSV,
+                    ExternalDataConstants.FORMAT_DELIMITED_TEXT, ExternalDataConstants.FORMAT_TSV));
 
     @Override
     public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
-        return createParser();
+        return createParser(ctx);
     }
 
-    private DelimitedDataParser createParser() throws HyracksDataException {
+    private DelimitedDataParser createParser(IHyracksTaskContext ctx) throws HyracksDataException {
         IValueParserFactory[] valueParserFactories = ExternalDataUtils.getValueParserFactories(recordType);
-        Character delimiter = DelimitedDataParserFactory.getDelimiter(configuration);
-        char quote = DelimitedDataParserFactory.getQuote(configuration, delimiter);
+        char delimiter = ExternalDataUtils.validateGetDelimiter(configuration);
+        char quote = ExternalDataUtils.validateGetQuote(configuration, delimiter);
         boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
-        return new DelimitedDataParser(valueParserFactories, delimiter, quote, hasHeader, recordType,
+        return new DelimitedDataParser(ctx, valueParserFactories, delimiter, quote, hasHeader, recordType,
                 ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.STREAM));
     }
 
@@ -64,41 +62,7 @@
     @Override
     public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
-        return createParser();
-    }
-
-    // Get a delimiter from the given configuration
-    public static char getDelimiter(Map<String, String> configuration) throws HyracksDataException {
-        String delimiterValue = configuration.get(ExternalDataConstants.KEY_DELIMITER);
-        if (delimiterValue == null) {
-            delimiterValue = ExternalDataConstants.DEFAULT_DELIMITER;
-        } else if (delimiterValue.length() != 1) {
-            throw new RuntimeDataException(ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_DELIMITER,
-                    delimiterValue);
-        }
-        return delimiterValue.charAt(0);
-    }
-
-    // Get a quote from the given configuration when the delimiter is given
-    // Need to pass delimiter to check whether they share the same character
-    public static char getQuote(Map<String, String> configuration, char delimiter) throws HyracksDataException {
-        String quoteValue = configuration.get(ExternalDataConstants.KEY_QUOTE);
-        if (quoteValue == null) {
-            quoteValue = ExternalDataConstants.DEFAULT_QUOTE;
-        } else if (quoteValue.length() != 1) {
-            throw new RuntimeDataException(ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_QUOTE,
-                    quoteValue);
-        }
-
-        // Since delimiter (char type value) can't be null,
-        // we only check whether delimiter and quote use the same character
-        if (quoteValue.charAt(0) == delimiter) {
-            throw new RuntimeDataException(
-                    ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_QUOTE_DELIMITER_MISMATCH, quoteValue,
-                    delimiter);
-        }
-
-        return quoteValue.charAt(0);
+        return createParser(ctx);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
index d34b5e0..704ea29 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
@@ -55,7 +55,7 @@
         String recordFormat = configuration.get(ExternalDataConstants.KEY_RECORD_FORMAT);
         if (recordFormat == null) {
             throw AlgebricksException.create(ErrorCode.UNKNOWN_RECORD_FORMAT_FOR_META_PARSER,
-                    ExternalDataConstants.KEY_FORMAT);
+                    ExternalDataConstants.KEY_RECORD_FORMAT);
         }
         String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
         if (format == null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
index 414c460..27ac10e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -27,7 +27,7 @@
 import org.apache.asterix.external.api.IIndexingAdapterFactory;
 import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -39,11 +39,15 @@
  */
 public class AdapterFactoryProvider {
 
-    // Adapters
+    private AdapterFactoryProvider() {
+    }
+
+    // get adapter factory. this method has the side effect of modifying the configuration as necessary
     public static ITypedAdapterFactory getAdapterFactory(IServiceContext serviceCtx, String adapterName,
             Map<String, String> configuration, ARecordType itemType, ARecordType metaType)
             throws HyracksDataException, AlgebricksException {
-        ExternalDataCompatibilityUtils.prepare(adapterName, configuration);
+        ExternalDataUtils.defaultConfiguration(configuration);
+        ExternalDataUtils.prepare(adapterName, configuration);
         ICcApplicationContext context = (ICcApplicationContext) serviceCtx.getApplicationContext();
         ITypedAdapterFactory adapterFactory =
                 (ITypedAdapterFactory) context.getAdapterFactoryService().createAdapterFactory();
@@ -53,11 +57,12 @@
         return adapterFactory;
     }
 
-    // Indexing Adapters
+    // get indexing adapter factory. this method has the side effect of modifying the configuration as necessary
     public static IIndexingAdapterFactory getIndexingAdapterFactory(IServiceContext serviceCtx, String adapterName,
             Map<String, String> configuration, ARecordType itemType, List<ExternalFile> snapshot, boolean indexingOp,
             ARecordType metaType) throws HyracksDataException, AlgebricksException {
-        ExternalDataCompatibilityUtils.prepare(adapterName, configuration);
+        ExternalDataUtils.defaultConfiguration(configuration);
+        ExternalDataUtils.prepare(adapterName, configuration);
         GenericAdapterFactory adapterFactory = new GenericAdapterFactory();
         adapterFactory.setOutputType(itemType);
         adapterFactory.setMetaType(metaType);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index 53cf6b1..2265a25 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -46,15 +46,12 @@
     public static IDataParserFactory getDataParserFactory(ILibraryManager libraryManager,
             Map<String, String> configuration) throws AsterixException {
         IDataParserFactory parserFactory;
-        String parserFactoryName = configuration.get(ExternalDataConstants.KEY_DATA_PARSER);
+        String parserFactoryName = configuration.get(ExternalDataConstants.KEY_PARSER);
         if (ExternalDataUtils.isExternal(parserFactoryName)) {
             return ExternalDataUtils.createExternalParserFactory(libraryManager,
                     ExternalDataUtils.getDataverse(configuration), parserFactoryName);
         } else {
-            String parserFactoryKey = ExternalDataUtils.getRecordFormat(configuration);
-            if (parserFactoryKey == null) {
-                parserFactoryKey = configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
-            }
+            String parserFactoryKey = ExternalDataUtils.getParserFactory(configuration);
             parserFactory = ParserFactoryProvider.getDataParserFactory(parserFactoryKey);
         }
         return parserFactory;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
index e222e99..8181262 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.external.util;
 
-import java.util.Map;
-
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.IDataParserFactory;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
@@ -30,6 +28,9 @@
 
 public class ExternalDataCompatibilityUtils {
 
+    private ExternalDataCompatibilityUtils() {
+    }
+
     public static void validateCompatibility(IExternalDataSourceFactory dataSourceFactory,
             IDataParserFactory dataParserFactory) throws AsterixException {
         if (dataSourceFactory.getDataSourceType() != dataParserFactory.getDataSourceType()) {
@@ -58,16 +59,4 @@
                             + recordParserFactory.getRecordClass());
         }
     }
-
-    public static void prepare(String adapterName, Map<String, String> configuration) {
-        if (!configuration.containsKey(ExternalDataConstants.KEY_READER)) {
-            configuration.put(ExternalDataConstants.KEY_READER, adapterName);
-        }
-        if (!configuration.containsKey(ExternalDataConstants.KEY_PARSER)) {
-            if (configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) {
-                configuration.put(ExternalDataConstants.KEY_PARSER,
-                        configuration.get(ExternalDataConstants.KEY_FORMAT));
-            }
-        }
-    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 26f5402..a05ad77 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -18,7 +18,15 @@
  */
 package org.apache.asterix.external.util;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
 public class ExternalDataConstants {
+
+    private ExternalDataConstants() {
+    }
+
     // TODO: Remove unused variables.
     /**
      * Keys
@@ -62,6 +70,7 @@
     public static final String KEY_LOCAL_SOCKET_PATH = "local-socket-path";
     public static final String KEY_FORMAT = "format";
     public static final String KEY_QUOTE = "quote";
+    public static final String KEY_QUOTE_ESCAPE = "quote-escape";
     public static final String KEY_PARSER = "parser";
     public static final String KEY_DATASET_RECORD = "dataset-record";
     public static final String KEY_HIVE_SERDE = "hive-serde";
@@ -163,6 +172,26 @@
     public static final String FORMAT_LINE_SEPARATED = "line-separated";
     public static final String FORMAT_HDFS_WRITABLE = "hdfs-writable";
     public static final String FORMAT_KV = "kv";
+    public static final String FORMAT_CSV = "csv";
+    public static final String FORMAT_TSV = "tsv";
+    public static final Set<String> ALL_FORMATS;
+    static {
+        Set<String> formats = new HashSet<>(13);
+        formats.add(FORMAT_HIVE);
+        formats.add(FORMAT_BINARY);
+        formats.add(FORMAT_ADM);
+        formats.add(FORMAT_JSON_LOWER_CASE);
+        formats.add(FORMAT_DELIMITED_TEXT);
+        formats.add(FORMAT_TWEET);
+        formats.add(FORMAT_RSS);
+        formats.add(FORMAT_SEMISTRUCTURED);
+        formats.add(FORMAT_LINE_SEPARATED);
+        formats.add(FORMAT_HDFS_WRITABLE);
+        formats.add(FORMAT_KV);
+        formats.add(FORMAT_CSV);
+        formats.add(FORMAT_TSV);
+        ALL_FORMATS = Collections.unmodifiableSet(formats);
+    }
 
     /**
      * input streams
@@ -188,6 +217,8 @@
      */
     public static final String TRUE = "true";
     public static final String FALSE = "false";
+    public static final String TAB_STR = "\t";
+    public static final String NULL_STR = "\0";
 
     /**
      * Constant characters
@@ -227,7 +258,6 @@
     public static final String EXTERNAL = "external";
     public static final String KEY_READER_FACTORY = "reader-factory";
     public static final String READER_RSS = "rss_feed";
-    public static final String FORMAT_CSV = "csv";
 
     public static final String ERROR_PARSE_RECORD = "Parser failed to parse record";
 
@@ -238,8 +268,5 @@
         public static final String CONTAINER_NAME_FIELD_NAME = "container";
         public static final String DEFINITION_FIELD_NAME = "definition";
         public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
-        public static final String[] REQUIRED_LINK_PARAMETERS =
-                new String[] { ACCESS_KEY_FIELD_NAME, SECRET_KEY_FIELD_NAME, REGION_FIELD_NAME };
-        public static final String[] OPTIONAL_LINK_PARAMETERS = new String[] { SERVICE_END_POINT_FIELD_NAME };
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 8aebd90..652eae5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -38,6 +38,7 @@
 import org.apache.asterix.om.types.AUnionType;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.parsers.BooleanParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -47,46 +48,49 @@
 
 public class ExternalDataUtils {
 
+    private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
+    static {
+        valueParserFactoryMap.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE);
+        valueParserFactoryMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
+        valueParserFactoryMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
+        valueParserFactoryMap.put(ATypeTag.BIGINT, LongParserFactory.INSTANCE);
+        valueParserFactoryMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
+        valueParserFactoryMap.put(ATypeTag.BOOLEAN, BooleanParserFactory.INSTANCE);
+    }
+
+    private ExternalDataUtils() {
+    }
+
     // Get a delimiter from the given configuration
-    public static char getDelimiter(Map<String, String> configuration) throws AsterixException {
+    public static char validateGetDelimiter(Map<String, String> configuration) throws HyracksDataException {
         String delimiterValue = configuration.get(ExternalDataConstants.KEY_DELIMITER);
         if (delimiterValue == null) {
-            delimiterValue = ExternalDataConstants.DEFAULT_DELIMITER;
-        } else if (delimiterValue.length() != 1) {
-            throw new AsterixException(
-                    "'" + delimiterValue + "' is not a valid delimiter. The length of a delimiter should be 1.");
+            return ExternalDataConstants.DEFAULT_DELIMITER.charAt(0);
         }
+        validateDelimiter(delimiterValue);
         return delimiterValue.charAt(0);
     }
 
     // Get a quote from the given configuration when the delimiter is given
     // Need to pass delimiter to check whether they share the same character
-    public static char getQuote(Map<String, String> configuration, char delimiter) throws AsterixException {
+    public static char validateGetQuote(Map<String, String> configuration, char delimiter) throws HyracksDataException {
         String quoteValue = configuration.get(ExternalDataConstants.KEY_QUOTE);
         if (quoteValue == null) {
-            quoteValue = ExternalDataConstants.DEFAULT_QUOTE;
-        } else if (quoteValue.length() != 1) {
-            throw new AsterixException("'" + quoteValue + "' is not a valid quote. The length of a quote should be 1.");
+            return ExternalDataConstants.DEFAULT_QUOTE.charAt(0);
         }
-
-        // Since delimiter (char type value) can't be null,
-        // we only check whether delimiter and quote use the same character
-        if (quoteValue.charAt(0) == delimiter) {
-            throw new AsterixException(
-                    "Quote '" + quoteValue + "' cannot be used with the delimiter '" + delimiter + "'. ");
-        }
-
-        return quoteValue.charAt(0);
+        validateQuote(quoteValue);
+        char quote = quoteValue.charAt(0);
+        validateDelimiterAndQuote(delimiter, quote);
+        return quote;
     }
 
-    // Get the header flag
-    public static boolean getHasHeader(Map<String, String> configuration) {
-        return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_HEADER));
-    }
-
-    public static void validateParameters(Map<String, String> configuration) throws AsterixException {
-        validateDataSourceParameters(configuration);
-        validateDataParserParameters(configuration);
+    public static char validateGetQuoteEscape(Map<String, String> configuration) throws HyracksDataException {
+        String quoteEscapeValue = configuration.get(ExternalDataConstants.KEY_QUOTE_ESCAPE);
+        if (quoteEscapeValue == null) {
+            return ExternalDataConstants.ESCAPE;
+        }
+        validateQuoteEscape(quoteEscapeValue);
+        return quoteEscapeValue.charAt(0);
     }
 
     public static void validateDataParserParameters(Map<String, String> configuration) throws AsterixException {
@@ -94,8 +98,8 @@
         if (parser == null) {
             String parserFactory = configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
             if (parserFactory == null) {
-                throw new AsterixException("The parameter " + ExternalDataConstants.KEY_FORMAT + " or "
-                        + ExternalDataConstants.KEY_PARSER_FACTORY + " must be specified.");
+                throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED,
+                        ExternalDataConstants.KEY_FORMAT + " or " + ExternalDataConstants.KEY_PARSER_FACTORY);
             }
         }
     }
@@ -103,7 +107,7 @@
     public static void validateDataSourceParameters(Map<String, String> configuration) throws AsterixException {
         String reader = configuration.get(ExternalDataConstants.KEY_READER);
         if (reader == null) {
-            throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER + " must be specified.");
+            throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_READER);
         }
     }
 
@@ -149,30 +153,13 @@
         return DataverseName.createFromCanonicalForm(configuration.get(ExternalDataConstants.KEY_DATAVERSE));
     }
 
-    public static String getRecordFormat(Map<String, String> configuration) {
-        String parserFormat = configuration.get(ExternalDataConstants.KEY_DATA_PARSER);
-        return parserFormat != null ? parserFormat : configuration.get(ExternalDataConstants.KEY_FORMAT);
-    }
-
-    public static void setRecordFormat(Map<String, String> configuration, String format) {
-        if (!configuration.containsKey(ExternalDataConstants.KEY_DATA_PARSER)) {
-            configuration.put(ExternalDataConstants.KEY_DATA_PARSER, format);
+    public static String getParserFactory(Map<String, String> configuration) {
+        String parserFactory = configuration.get(ExternalDataConstants.KEY_PARSER);
+        if (parserFactory != null) {
+            return parserFactory;
         }
-        if (!configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) {
-            configuration.put(ExternalDataConstants.KEY_FORMAT, format);
-        }
-    }
-
-    private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
-
-    private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
-        Map<ATypeTag, IValueParserFactory> m = new EnumMap<>(ATypeTag.class);
-        m.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE);
-        m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
-        m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
-        m.put(ATypeTag.BIGINT, LongParserFactory.INSTANCE);
-        m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
-        return m;
+        parserFactory = configuration.get(ExternalDataConstants.KEY_FORMAT);
+        return parserFactory != null ? parserFactory : configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
     }
 
     public static IValueParserFactory[] getValueParserFactories(ARecordType recordType) {
@@ -205,10 +192,6 @@
         return vpf;
     }
 
-    public static String getRecordReaderStreamName(Map<String, String> configuration) {
-        return configuration.get(ExternalDataConstants.KEY_READER_STREAM);
-    }
-
     public static boolean hasHeader(Map<String, String> configuration) {
         String value = configuration.get(ExternalDataConstants.KEY_HEADER);
         if (value != null) {
@@ -294,12 +277,6 @@
         return configuration.get(ExternalDataConstants.KEY_FEED_NAME);
     }
 
-    public static int getQueueSize(Map<String, String> configuration) {
-        return configuration.containsKey(ExternalDataConstants.KEY_QUEUE_SIZE)
-                ? Integer.parseInt(configuration.get(ExternalDataConstants.KEY_QUEUE_SIZE))
-                : ExternalDataConstants.DEFAULT_QUEUE_SIZE;
-    }
-
     public static boolean isRecordWithMeta(Map<String, String> configuration) {
         return configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME);
     }
@@ -319,8 +296,7 @@
     public static int getNumberOfKeys(Map<String, String> configuration) throws AsterixException {
         String keyIndexes = configuration.get(ExternalDataConstants.KEY_KEY_INDEXES);
         if (keyIndexes == null) {
-            throw new AsterixException(
-                    "A change feed must have the parameter " + ExternalDataConstants.KEY_KEY_INDEXES);
+            throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_KEY_INDEXES);
         }
         return keyIndexes.split(",").length;
     }
@@ -352,4 +328,120 @@
         }
         return intIndicators;
     }
+
+    /**
+     * Fills the configuration of the external dataset and its adapter with default values if not provided by user.
+     *
+     * @param configuration external data configuration
+     */
+    public static void defaultConfiguration(Map<String, String> configuration) {
+        String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
+        if (format != null) {
+            // default quote, escape character for quote and fields delimiter for csv and tsv format
+            if (format.equals(ExternalDataConstants.FORMAT_CSV)) {
+                configuration.putIfAbsent(ExternalDataConstants.KEY_DELIMITER, ExternalDataConstants.DEFAULT_DELIMITER);
+                configuration.putIfAbsent(ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.DEFAULT_QUOTE);
+                configuration.putIfAbsent(ExternalDataConstants.KEY_QUOTE_ESCAPE, ExternalDataConstants.DEFAULT_QUOTE);
+            } else if (format.equals(ExternalDataConstants.FORMAT_TSV)) {
+                configuration.putIfAbsent(ExternalDataConstants.KEY_DELIMITER, ExternalDataConstants.TAB_STR);
+                configuration.putIfAbsent(ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.NULL_STR);
+                configuration.putIfAbsent(ExternalDataConstants.KEY_QUOTE_ESCAPE, ExternalDataConstants.NULL_STR);
+            }
+        }
+    }
+
+    /**
+     * Prepares the configuration of the external data and its adapter by filling the information required by
+     * adapters and parsers.
+     *
+     * @param adapterName adapter name
+     * @param configuration external data configuration
+     */
+    public static void prepare(String adapterName, Map<String, String> configuration) {
+        if (!configuration.containsKey(ExternalDataConstants.KEY_READER)) {
+            configuration.put(ExternalDataConstants.KEY_READER, adapterName);
+        }
+        if (!configuration.containsKey(ExternalDataConstants.KEY_PARSER)
+                && configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) {
+            configuration.put(ExternalDataConstants.KEY_PARSER, configuration.get(ExternalDataConstants.KEY_FORMAT));
+        }
+    }
+
+    /**
+     * Normalizes the values of certain parameters of the adapter configuration. This should happen before persisting
+     * the metadata (e.g. when creating external datasets or feeds) and when creating an adapter factory.
+     *
+     * @param configuration external data configuration
+     */
+    public static void normalize(Map<String, String> configuration) {
+        // normalize the "format" parameter
+        String paramValue = configuration.get(ExternalDataConstants.KEY_FORMAT);
+        if (paramValue != null) {
+            String lowerCaseFormat = paramValue.toLowerCase().trim();
+            if (ExternalDataConstants.ALL_FORMATS.contains(lowerCaseFormat)) {
+                configuration.put(ExternalDataConstants.KEY_FORMAT, lowerCaseFormat);
+            }
+        }
+        // normalize the "header" parameter
+        paramValue = configuration.get(ExternalDataConstants.KEY_HEADER);
+        if (paramValue != null) {
+            configuration.put(ExternalDataConstants.KEY_HEADER, paramValue.toLowerCase().trim());
+        }
+    }
+
+    /**
+     * Validates the parameter values of the adapter configuration. This should happen after normalizing the values.
+     *
+     * @param configuration external data configuration
+     * @throws HyracksDataException HyracksDataException
+     */
+    public static void validate(Map<String, String> configuration) throws HyracksDataException {
+        String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
+        String header = configuration.get(ExternalDataConstants.KEY_HEADER);
+        if (format != null && isHeaderRequiredFor(format) && header == null) {
+            throw new RuntimeDataException(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_HEADER);
+        }
+        if (header != null && !isBoolean(header)) {
+            throw new RuntimeDataException(ErrorCode.INVALID_REQ_PARAM_VAL, ExternalDataConstants.KEY_HEADER, header);
+        }
+        char delimiter = validateGetDelimiter(configuration);
+        validateGetQuote(configuration, delimiter);
+        validateGetQuoteEscape(configuration);
+    }
+
+    private static boolean isHeaderRequiredFor(String format) {
+        return format.equals(ExternalDataConstants.FORMAT_CSV) || format.equals(ExternalDataConstants.FORMAT_TSV);
+    }
+
+    private static boolean isBoolean(String value) {
+        return value.equals(ExternalDataConstants.TRUE) || value.equals(ExternalDataConstants.FALSE);
+    }
+
+    private static void validateDelimiter(String delimiter) throws RuntimeDataException {
+        if (delimiter.length() != 1) {
+            throw new RuntimeDataException(ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_DELIMITER,
+                    delimiter);
+        }
+    }
+
+    public static void validateQuote(String quote) throws RuntimeDataException {
+        if (quote.length() != 1) {
+            throw new RuntimeDataException(ErrorCode.PARSER_INVALID_CHAR_LENGTH, quote,
+                    ExternalDataConstants.KEY_QUOTE);
+        }
+    }
+
+    private static void validateQuoteEscape(String quoteEsc) throws RuntimeDataException {
+        if (quoteEsc.length() != 1) {
+            throw new RuntimeDataException(ErrorCode.PARSER_INVALID_CHAR_LENGTH, quoteEsc,
+                    ExternalDataConstants.KEY_QUOTE_ESCAPE);
+        }
+    }
+
+    private static void validateDelimiterAndQuote(char delimiter, char quote) throws RuntimeDataException {
+        if (quote == delimiter) {
+            throw new RuntimeDataException(
+                    ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_QUOTE_DELIMITER_MISMATCH, quote, delimiter);
+        }
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
index 2ba5a3e..2972542 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
@@ -41,6 +41,8 @@
 import org.apache.asterix.external.input.stream.LocalFSInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FileSystemWatcher;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.test.support.TestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -56,6 +58,7 @@
     private final CharBuffer chars = CharBuffer.allocate(BUFFER_SIZE);
     private final CharArrayRecord value = new CharArrayRecord();
     private final ByteBuf nettyBuffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer(KB32, Integer.MAX_VALUE);
+    private final IHyracksTaskContext ctx = TestUtils.createHyracksTask();
 
     @Test
     public void eatGlass() {
@@ -83,7 +86,7 @@
         FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
         LocalFSInputStream in = new LocalFSInputStream(watcher);
         try (SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader()) {
-            recordReader.configure(in, config);
+            recordReader.configure(ctx, in, config);
             while (recordReader.hasNext()) {
                 try {
                     IRawRecord<char[]> record = recordReader.next();
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
index 1584065..47c6ffe 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
@@ -44,12 +44,15 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.algebricks.data.IPrinter;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.test.support.TestUtils;
 import org.junit.Assert;
 
 public class RecordWithMetaTest {
+    private final IHyracksTaskContext ctx = TestUtils.createHyracksTask();
     private static ARecordType recordType;
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
@@ -90,7 +93,7 @@
             config.put(ExternalDataConstants.KEY_HEADER, "true");
             config.put(ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.DEFAULT_QUOTE);
             LineRecordReader lineReader = new LineRecordReader();
-            lineReader.configure(inputStream, config);
+            lineReader.configure(ctx, inputStream, config);
             // create csv with json record reader
             CSVToRecordWithMetadataAndPKConverter recordConverter = new CSVToRecordWithMetadataAndPKConverter(
                     valueIndex, delimiter, metaType, recordType, pkIndicators, pkIndexes, keyTypes);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index 64b703b..f0567a4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -22,6 +22,7 @@
 import java.io.DataOutput;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -64,6 +65,7 @@
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.compression.CompressionManager;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -115,20 +117,9 @@
         String nodeGroupName =
                 ((AString) datasetRecord.getValueByPos(MetadataRecordTypes.DATASET_ARECORD_GROUPNAME_FIELD_INDEX))
                         .getStringValue();
-        String compactionPolicy = ((AString) datasetRecord
-                .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX)).getStringValue();
-        IACursor cursor = ((AOrderedList) datasetRecord
-                .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX))
-                        .getCursor();
-        Map<String, String> compactionPolicyProperties = new LinkedHashMap<>();
-        while (cursor.next()) {
-            ARecord field = (ARecord) cursor.get();
-            String key =
-                    ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)).getStringValue();
-            String value =
-                    ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)).getStringValue();
-            compactionPolicyProperties.put(key, value);
-        }
+
+        Pair<String, Map<String, String>> compactionPolicy = readCompactionPolicy(datasetType, datasetRecord);
+
         switch (datasetType) {
             case INTERNAL: {
                 ARecord datasetDetailsRecord = (ARecord) datasetRecord
@@ -139,7 +130,7 @@
                 PartitioningStrategy partitioningStrategy = PartitioningStrategy.valueOf(((AString) datasetDetailsRecord
                         .getValueByPos(MetadataRecordTypes.INTERNAL_DETAILS_ARECORD_PARTITIONSTRATEGY_FIELD_INDEX))
                                 .getStringValue());
-                cursor = ((AOrderedList) datasetDetailsRecord
+                IACursor cursor = ((AOrderedList) datasetDetailsRecord
                         .getValueByPos(MetadataRecordTypes.INTERNAL_DETAILS_ARECORD_PARTITIONKEY_FIELD_INDEX))
                                 .getCursor();
                 List<List<String>> partitioningKey = new ArrayList<>();
@@ -199,7 +190,7 @@
                 String adapter = ((AString) datasetDetailsRecord
                         .getValueByPos(MetadataRecordTypes.EXTERNAL_DETAILS_ARECORD_DATASOURCE_ADAPTER_FIELD_INDEX))
                                 .getStringValue();
-                cursor = ((AOrderedList) datasetDetailsRecord
+                IACursor cursor = ((AOrderedList) datasetDetailsRecord
                         .getValueByPos(MetadataRecordTypes.EXTERNAL_DETAILS_ARECORD_PROPERTIES_FIELD_INDEX))
                                 .getCursor();
                 Map<String, String> properties = new HashMap<>();
@@ -242,10 +233,34 @@
         String compressionScheme = getCompressionScheme(datasetRecord);
 
         return new Dataset(dataverseName, datasetName, typeDataverseName, typeName, metaTypeDataverseName, metaTypeName,
-                nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, hints, datasetType,
+                nodeGroupName, compactionPolicy.first, compactionPolicy.second, datasetDetails, hints, datasetType,
                 datasetId, pendingOp, rebalanceCount, compressionScheme);
     }
 
+    protected Pair<String, Map<String, String>> readCompactionPolicy(DatasetType datasetType, ARecord datasetRecord) {
+
+        String compactionPolicy = ((AString) datasetRecord
+                .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX)).getStringValue();
+        AOrderedList compactionPolicyPropertiesList = ((AOrderedList) datasetRecord
+                .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX));
+
+        Map<String, String> compactionPolicyProperties;
+        if (compactionPolicyPropertiesList.size() > 0) {
+            compactionPolicyProperties = new LinkedHashMap<>();
+            for (IACursor cursor = compactionPolicyPropertiesList.getCursor(); cursor.next();) {
+                ARecord field = (ARecord) cursor.get();
+                String key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX))
+                        .getStringValue();
+                String value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX))
+                        .getStringValue();
+                compactionPolicyProperties.put(key, value);
+            }
+        } else {
+            compactionPolicyProperties = Collections.emptyMap();
+        }
+        return new Pair<>(compactionPolicy, compactionPolicyProperties);
+    }
+
     private long getRebalanceCount(ARecord datasetRecord) {
         // Read the rebalance count if there is one.
         int rebalanceCountIndex =
@@ -323,29 +338,9 @@
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_GROUPNAME_FIELD_INDEX, fieldValue);
 
-        // write field 6
-        fieldValue.reset();
-        aString.setValue(dataset.getCompactionPolicy());
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX, fieldValue);
-
-        // write field 7
-        listBuilder.reset((AOrderedListType) MetadataRecordTypes.DATASET_RECORDTYPE
-                .getFieldTypes()[MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX]);
-        if (dataset.getCompactionPolicyProperties() != null) {
-            for (Map.Entry<String, String> property : dataset.getCompactionPolicyProperties().entrySet()) {
-                String name = property.getKey();
-                String value = property.getValue();
-                itemValue.reset();
-                DatasetUtil.writePropertyTypeRecord(name, value, itemValue.getDataOutput(),
-                        MetadataRecordTypes.COMPACTION_POLICY_PROPERTIES_RECORDTYPE);
-                listBuilder.addItem(itemValue);
-            }
-        }
-        fieldValue.reset();
-        listBuilder.write(fieldValue.getDataOutput(), true);
-        recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX,
-                fieldValue);
+        // write field 6/7
+        writeCompactionPolicy(dataset.getDatasetType(), dataset.getCompactionPolicy(),
+                dataset.getCompactionPolicyProperties(), listBuilder, itemValue);
 
         // write field 8/9
         fieldValue.reset();
@@ -395,6 +390,34 @@
         return tuple;
     }
 
+    protected void writeCompactionPolicy(DatasetType datasetType, String compactionPolicy,
+            Map<String, String> compactionPolicyProperties, OrderedListBuilder listBuilder,
+            ArrayBackedValueStorage itemValue) throws HyracksDataException {
+        // write field 6
+        fieldValue.reset();
+        aString.setValue(compactionPolicy);
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX, fieldValue);
+
+        // write field 7
+        listBuilder.reset((AOrderedListType) MetadataRecordTypes.DATASET_RECORDTYPE
+                .getFieldTypes()[MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX]);
+        if (compactionPolicyProperties != null && !compactionPolicyProperties.isEmpty()) {
+            for (Map.Entry<String, String> property : compactionPolicyProperties.entrySet()) {
+                String name = property.getKey();
+                String value = property.getValue();
+                itemValue.reset();
+                DatasetUtil.writePropertyTypeRecord(name, value, itemValue.getDataOutput(),
+                        MetadataRecordTypes.COMPACTION_POLICY_PROPERTIES_RECORDTYPE);
+                listBuilder.addItem(itemValue);
+            }
+        }
+        fieldValue.reset();
+        listBuilder.write(fieldValue.getDataOutput(), true);
+        recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX,
+                fieldValue);
+    }
+
     /**
      * Keep protected to allow other extensions to add additional fields
      */
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 09eaa8b..e0a8e83 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -143,13 +143,13 @@
                     default:
                         throw new AsterixException("Unknown Adapter type " + adapterType);
                 }
-                adapterFactory.setOutputType(adapterOutputType);
-                adapterFactory.setMetaType(metaType);
-                adapterFactory.configure(appCtx.getServiceContext(), configuration);
             } else {
-                AdapterFactoryProvider.getAdapterFactory(appCtx.getServiceContext(), adapterName, configuration,
-                        adapterOutputType, metaType);
+                ExternalDataUtils.prepare(adapterName, configuration);
+                adapterFactory = (ITypedAdapterFactory) appCtx.getAdapterFactoryService().createAdapterFactory();
             }
+            adapterFactory.setOutputType(adapterOutputType);
+            adapterFactory.setMetaType(metaType);
+            adapterFactory.configure(appCtx.getServiceContext(), configuration);
             if (metaType == null && configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME)) {
                 metaType = getOutputType(feed, configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME));
                 if (metaType == null) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetItemDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetItemDescriptor.java
index f7bddf1..fb71eae 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetItemDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetItemDescriptor.java
@@ -116,6 +116,10 @@
                         int itemIndex = ATypeHierarchy.getIntegerValue(BuiltinFunctions.GET_ITEM.getName(), 0,
                                 indexBytes, indexOffset);
 
+                        if (itemIndex < 0) {
+                            itemIndex = itemCount + itemIndex;
+                        }
+
                         if (itemIndex < 0 || itemIndex >= itemCount) {
                             // Out-of-bound index access should return MISSING.
                             result.set(missingBytes, 0, 1);
diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml
index b660379..b794d73 100644
--- a/asterixdb/asterix-server/pom.xml
+++ b/asterixdb/asterix-server/pom.xml
@@ -169,8 +169,8 @@
               <url>https://raw.githubusercontent.com/mojohaus/appassembler/appassembler-2.0.0/LICENSE.txt</url>
             </override>
             <override>
-              <gav>io.netty:netty-all:4.1.46.Final</gav>
-              <noticeUrl>https://raw.githubusercontent.com/netty/netty/netty-4.1.46.Final/NOTICE.txt</noticeUrl>
+              <gav>io.netty:netty-all:4.1.48.Final</gav>
+              <noticeUrl>https://raw.githubusercontent.com/netty/netty/netty-4.1.48.Final/NOTICE.txt</noticeUrl>
             </override>
             <override>
               <gav>org.reactivestreams:reactive-streams:1.0.2</gav>
diff --git a/asterixdb/src/main/appended-resources/supplemental-models.xml b/asterixdb/src/main/appended-resources/supplemental-models.xml
index 812c64a..2d74604 100644
--- a/asterixdb/src/main/appended-resources/supplemental-models.xml
+++ b/asterixdb/src/main/appended-resources/supplemental-models.xml
@@ -160,9 +160,9 @@
       <artifactId>netty-all</artifactId>
       <properties>
         <!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
-        <license.ignoreMissingEmbeddedLicense>4.1.25.Final,4.1.32.Final,4.1.42.Final,4.1.46.Final</license.ignoreMissingEmbeddedLicense>
-        <license.ignoreMissingEmbeddedNotice>4.1.25.Final,4.1.32.Final,4.1.42.Final,4.1.46.Final</license.ignoreMissingEmbeddedNotice>
-        <license.ignoreNoticeOverride>4.1.25.Final,4.1.32.Final,4.1.42.Final,4.1.46.Final</license.ignoreNoticeOverride>
+        <license.ignoreMissingEmbeddedLicense>4.1.25.Final,4.1.32.Final,4.1.42.Final,4.1.48.Final</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>4.1.25.Final,4.1.32.Final,4.1.42.Final,4.1.48.Final</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreNoticeOverride>4.1.25.Final,4.1.32.Final,4.1.42.Final,4.1.48.Final</license.ignoreNoticeOverride>
       </properties>
     </project>
   </supplement>
diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_netty_netty_netty-4.1.46.Final_NOTICE.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_netty_netty_netty-4.1.48.Final_NOTICE.txt
similarity index 100%
rename from asterixdb/src/main/licenses/content/raw.githubusercontent.com_netty_netty_netty-4.1.46.Final_NOTICE.txt
rename to asterixdb/src/main/licenses/content/raw.githubusercontent.com_netty_netty_netty-4.1.48.Final_NOTICE.txt
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java
index 4deba7b..7baf268 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java
@@ -37,4 +37,9 @@
     public void open(IHyracksCommonContext ctx) throws HyracksDataException;
 
     public void close() throws HyracksDataException;
+
+    /**
+     * Called when a failure is encountered while reading data
+     */
+    void fail();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index cfb6c9e..87b963a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -59,7 +59,7 @@
 40 = Unknown inverted index type %1$s
 41 = Cannot propose linearizer if dimensions have different types
 42 = Cannot propose linearizer for type %1$s
-43 = Record size (%1$s) larger than maximum acceptable record size (%2$s)
+43 = Index entry size (%1$s) larger than maximum acceptable entry size (%2$s)
 44 = Failed to re-find parent of a page in the tree
 45 = Failed to find a tuple in a page
 46 = Unsorted load input
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
index c8f5bd9..3c5a3d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
@@ -90,36 +90,43 @@
     public int read(IFrame frame) throws HyracksDataException {
         frame.reset();
         int readSize = 0;
-        if (isFirstRead() && !hasNextRecord()) {
-            return readSize;
-        }
-        // read until frame is full or all result records have been read
-        while (readSize < frame.getFrameSize()) {
-            if (currentRecordMonitor.hasMoreFrames()) {
-                final ByteBuffer readBuffer = currentRecordChannel.getNextBuffer();
-                if (readBuffer == null) {
-                    throw new IllegalStateException("Unexpected empty frame");
-                }
-                currentRecordMonitor.notifyFrameRead();
-                if (readSize == 0) {
-                    final int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer);
-                    frame.ensureFrameSize(frame.getMinSize() * nBlocks);
-                    frame.getBuffer().clear();
-                }
-                frame.getBuffer().put(readBuffer);
-                currentRecordChannel.recycleBuffer(readBuffer);
-                readSize = frame.getBuffer().position();
-            } else {
-                currentRecordChannel.close();
-                if (currentRecordMonitor.failed()) {
-                    throw HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId);
-                }
-                if (isLastRecord() || !hasNextRecord()) {
-                    break;
+        try {
+            if (isFirstRead() && !hasNextRecord()) {
+                return readSize;
+            }
+            // read until frame is full or all result records have been read
+            while (readSize < frame.getFrameSize()) {
+                if (currentRecordMonitor.hasMoreFrames()) {
+                    final ByteBuffer readBuffer = currentRecordChannel.getNextBuffer();
+                    if (readBuffer == null) {
+                        throw new IllegalStateException("Unexpected empty frame");
+                    }
+                    currentRecordMonitor.notifyFrameRead();
+                    if (readSize == 0) {
+                        final int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer);
+                        frame.ensureFrameSize(frame.getMinSize() * nBlocks);
+                        frame.getBuffer().clear();
+                    }
+                    frame.getBuffer().put(readBuffer);
+                    currentRecordChannel.recycleBuffer(readBuffer);
+                    readSize = frame.getBuffer().position();
+                } else {
+                    currentRecordChannel.close();
+                    if (currentRecordMonitor.failed()) {
+                        throw HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId);
+                    }
+                    if (isLastRecord() || !hasNextRecord()) {
+                        break;
+                    }
                 }
             }
+            frame.getBuffer().flip();
+        } catch (Exception e) {
+            if (isLocalFailure()) {
+                currentRecordChannel.fail();
+            }
+            throw e;
         }
-        frame.getBuffer().flip();
         return readSize;
     }
 
@@ -201,6 +208,10 @@
         return knownRecords != null && currentRecord == knownRecords.length - 1;
     }
 
+    private boolean isLocalFailure() {
+        return currentRecordMonitor != null && !currentRecordMonitor.failed();
+    }
+
     private static class ResultInputChannelMonitor implements IInputChannelMonitor {
 
         private int availableFrames;
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
index 58664c6..53bb7cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
@@ -119,6 +119,11 @@
 
     }
 
+    @Override
+    public void fail() {
+        // do nothing (covered by job lifecycle)
+    }
+
     private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
         @Override
         public void accept(ByteBuffer buffer) {
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
index 1df39e9..38cf7c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -127,6 +128,11 @@
 
     }
 
+    @Override
+    public void fail() {
+        ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+    }
+
     private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
         @Override
         public void accept(ByteBuffer buffer) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index 83677f8..2a40eb8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -99,6 +99,11 @@
 
     }
 
+    @Override
+    public void fail() {
+        // do nothing (covered by job lifecycle)
+    }
+
     private class FrameWriter implements IFrameWriter {
         @Override
         public void open() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/parsers/BooleanParserFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/parsers/BooleanParserFactory.java
new file mode 100644
index 0000000..488be04
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/parsers/BooleanParserFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.parsers;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class BooleanParserFactory implements IValueParserFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IValueParserFactory INSTANCE = new BooleanParserFactory();
+
+    private BooleanParserFactory() {
+    }
+
+    @Override
+    public IValueParser createValueParser() {
+        return BooleanParserFactory::parse;
+    }
+
+    public static void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException {
+        try {
+            if (length == 4 && (buffer[start] == 't' || buffer[start] == 'T')
+                    && (buffer[start + 1] == 'r' || buffer[start + 1] == 'R')
+                    && (buffer[start + 2] == 'u' || buffer[start + 2] == 'U')
+                    && (buffer[start + 3] == 'e' || buffer[start + 3] == 'E')) {
+                out.writeBoolean(true);
+                return;
+            } else if (length == 5 && (buffer[start] == 'f' || buffer[start] == 'F')
+                    && (buffer[start + 1] == 'a' || buffer[start + 1] == 'A')
+                    && (buffer[start + 2] == 'l' || buffer[start + 2] == 'L')
+                    && (buffer[start + 3] == 's' || buffer[start + 3] == 'S')
+                    && (buffer[start + 4] == 'e' || buffer[start + 4] == 'E')) {
+                out.writeBoolean(false);
+                return;
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+
+        throw new HyracksDataException("Invalid input data");
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index 9ddb4c2..2eb882a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -76,12 +76,11 @@
                                 break;
                             }
                             // Eliminate double quotes in the field that we are going to parse
-                            if (cursor.isDoubleQuoteIncludedInThisField) {
-                                cursor.eliminateDoubleQuote(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
-                                cursor.fEnd -= cursor.doubleQuoteCount;
-                                cursor.isDoubleQuoteIncludedInThisField = false;
+                            if (cursor.fieldHasDoubleQuote()) {
+                                cursor.eliminateDoubleQuote();
                             }
-                            valueParsers[i].parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart, dos);
+                            valueParsers[i].parse(cursor.getBuffer(), cursor.getFieldStart(), cursor.getFieldLength(),
+                                    dos);
                             tb.addFieldEndOffset();
                         }
                         FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
index 7e5ee2c..fd3e4c3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
@@ -32,18 +32,18 @@
         EOF //end of stream reached
     }
 
-    public char[] buffer; //buffer to holds the input coming form the underlying input stream
-    public int fStart; //start position for field
-    public int fEnd; //end position for field
-    public int recordCount; //count of records
-    public int fieldCount; //count of fields in current record
-    public int doubleQuoteCount; //count of double quotes
-    public boolean isDoubleQuoteIncludedInThisField; //does current field include double quotes
+    private char[] buffer; //buffer to holds the input coming form the underlying input stream
+    private int fStart; //start position for field
+    private int fEnd; //end position for field
+    private int recordCount; //count of records
+    private int fieldCount; //count of fields in current record
+    private int doubleQuoteCount; //count of double quotes
+    private boolean isDoubleQuoteIncludedInThisField; //does current field include double quotes
 
     private static final int INITIAL_BUFFER_SIZE = 4096;//initial buffer size
     private static final int INCREMENT = 4096; //increment size
 
-    private Reader in; //the underlying buffer
+    private final Reader in; //the underlying buffer
 
     private int start; //start of valid buffer area
     private int end; //end of valid buffer area
@@ -55,8 +55,8 @@
     private int quoteCount; //count of single quotes
     private boolean startedQuote; //whether a quote has been started
 
-    private char quote; //the quote character
-    private char fieldDelimiter; //the delimiter
+    private final char quote; //the quote character
+    private final char fieldDelimiter; //the delimiter
 
     public FieldCursorForDelimitedDataParser(Reader in, char fieldDelimiter, char quote) {
         this.in = in;
@@ -70,9 +70,9 @@
         state = State.INIT;
         this.quote = quote;
         this.fieldDelimiter = fieldDelimiter;
-        lastDelimiterPosition = -99;
-        lastQuotePosition = -99;
-        lastDoubleQuotePosition = -99;
+        lastDelimiterPosition = -1;
+        lastQuotePosition = -1;
+        lastDoubleQuotePosition = -1;
         quoteCount = 0;
         doubleQuoteCount = 0;
         startedQuote = false;
@@ -81,9 +81,44 @@
         fieldCount = 0;
     }
 
-    public void nextRecord(char[] buffer, int recordLength) throws IOException {
+    public char[] getBuffer() {
+        return buffer;
+    }
+
+    public int getFieldStart() {
+        return fStart;
+    }
+
+    public int getFieldLength() {
+        return fEnd - fStart;
+    }
+
+    public boolean isFieldEmpty() {
+        return fStart == fEnd;
+    }
+
+    public boolean fieldHasDoubleQuote() {
+        return isDoubleQuoteIncludedInThisField;
+    }
+
+    public int getFieldCount() {
+        return fieldCount;
+    }
+
+    public int getRecordCount() {
+        return recordCount;
+    }
+
+    public void nextRecord(char[] buffer, int recordLength) {
         recordCount++;
         fieldCount = 0;
+        lastDelimiterPosition = -1;
+        lastQuotePosition = -1;
+        lastDoubleQuotePosition = -1;
+        quoteCount = 0;
+        doubleQuoteCount = 0;
+        startedQuote = false;
+        isDoubleQuoteIncludedInThisField = false;
         start = 0;
         end = recordLength;
         state = State.IN_RECORD;
@@ -187,7 +222,6 @@
     }
 
     public boolean nextField() throws IOException {
-        fieldCount++;
         switch (state) {
             case INIT:
             case EOR:
@@ -196,12 +230,12 @@
                 return false;
 
             case IN_RECORD:
-                boolean eof;
+                fieldCount++;
                 // reset quote related values
                 startedQuote = false;
                 isDoubleQuoteIncludedInThisField = false;
-                lastQuotePosition = -99;
-                lastDoubleQuotePosition = -99;
+                lastQuotePosition = -1;
+                lastDoubleQuotePosition = -1;
                 quoteCount = 0;
                 doubleQuoteCount = 0;
 
@@ -209,21 +243,26 @@
                 while (true) {
                     if (p >= end) {
                         int s = start;
-                        eof = !readMore();
+                        boolean eof = !readMore();
                         p -= (s - start);
-                        lastQuotePosition -= (s - start);
-                        lastDoubleQuotePosition -= (s - start);
-                        lastDelimiterPosition -= (s - start);
+                        lastQuotePosition -= (lastQuotePosition > -1) ? (s - start) : 0;
+                        lastDoubleQuotePosition -= (lastDoubleQuotePosition > -1) ? (s - start) : 0;
+                        lastDelimiterPosition -= (lastDelimiterPosition > -1) ? (s - start) : 0;
                         if (eof) {
                             state = State.EOF;
-                            if (startedQuote && lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
-                                    && quoteCount == doubleQuoteCount * 2 + 2) {
-                                // set the position of fStart to +1, fEnd to -1 to remove quote character
-                                fStart = start + 1;
-                                fEnd = p - 1;
-                            } else {
+                            if (!startedQuote) {
                                 fStart = start;
                                 fEnd = p;
+                            } else {
+                                if (lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
+                                        && quoteCount == doubleQuoteCount * 2 + 2) {
+                                    // set the position of fStart to +1, fEnd to -1 to remove quote character
+                                    fStart = start + 1;
+                                    fEnd = p - 1;
+                                } else {
+                                    throw new IOException("At record: " + recordCount + ", field#: " + fieldCount
+                                            + " - missing a closing quote");
+                                }
                             }
                             return true;
                         }
@@ -232,12 +271,12 @@
                     if (ch == quote) {
                         // If this is first quote in the field, then it needs to be placed in the beginning.
                         if (!startedQuote) {
-                            if (lastDelimiterPosition == p - 1 || lastDelimiterPosition == -99) {
+                            if (p == start) {
                                 startedQuote = true;
                             } else {
                                 // In this case, we don't have a quote in the beginning of a field.
                                 throw new IOException("At record: " + recordCount + ", field#: " + fieldCount
-                                        + " - a quote enclosing a field needs to be placed in the beginning of that field.");
+                                        + " - a quote enclosing a field needs to be placed in the beginning of that field");
                             }
                         }
                         // Check double quotes - "". We check [start != p-2]
@@ -245,8 +284,8 @@
                         // since it looks like a double quote. However, it's not a double quote.
                         // (e.g. if field2 has no value:
                         //       field1,"",field3 ... )
-                        if (lastQuotePosition == p - 1 && lastDelimiterPosition != p - 2
-                                && lastDoubleQuotePosition != p - 1) {
+                        if (lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
+                                && lastQuotePosition != start) {
                             isDoubleQuoteIncludedInThisField = true;
                             doubleQuoteCount++;
                             lastDoubleQuotePosition = p;
@@ -262,64 +301,46 @@
                             start = p + 1;
                             lastDelimiterPosition = p;
                             return true;
-                        } else if (startedQuote) {
-                            if (lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1) {
-                                // There is a quote right before the delimiter (e.g. ",)  and it is not two quote,
-                                // then the field contains a valid string.
-                                // We set the position of fStart to +1, fEnd to -1 to remove quote character
-                                fStart = start + 1;
-                                fEnd = p - 1;
-                                start = p + 1;
-                                lastDelimiterPosition = p;
-                                startedQuote = false;
-                                return true;
-                            } else if (lastQuotePosition < p - 1 && lastQuotePosition != lastDoubleQuotePosition
-                                    && quoteCount == doubleQuoteCount * 2 + 2) {
-                                // There is a quote before the delimiter, however it is not directly placed before the delimiter.
-                                // In this case, we throw an exception.
-                                // quoteCount == doubleQuoteCount * 2 + 2 : only true when we have two quotes except double-quotes.
-                                throw new IOException("At record: " + recordCount + ", field#: " + fieldCount
-                                        + " -  A quote enclosing a field needs to be followed by the delimiter.");
-                            }
+                        }
+
+                        if (lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
+                                && lastQuotePosition != start) {
+                            // There is a quote right before the delimiter (e.g. ",)  and it is not two quote,
+                            // then the field contains a valid string.
+                            // We set the position of fStart to +1, fEnd to -1 to remove quote character
+                            fStart = start + 1;
+                            fEnd = p - 1;
+                            start = p + 1;
+                            lastDelimiterPosition = p;
+                            startedQuote = false;
+                            return true;
+                        } else if (lastQuotePosition < p - 1 && lastQuotePosition != lastDoubleQuotePosition
+                                && quoteCount == doubleQuoteCount * 2 + 2) {
+                            // There is a quote before the delimiter, however it is not directly placed before the delimiter.
+                            // In this case, we throw an exception.
+                            // quoteCount == doubleQuoteCount * 2 + 2 : only true when we have two quotes except double-quotes.
+                            throw new IOException("At record: " + recordCount + ", field#: " + fieldCount
+                                    + " -  A quote enclosing a field needs to be followed by the delimiter.");
                         }
                         // If the control flow reaches here: we have a delimiter in this field and
                         // there should be a quote in the beginning and the end of
                         // this field. So, just continue reading next character
-                    } else if (ch == '\n') {
+                    } else if (ch == '\n' || ch == '\r') {
                         if (!startedQuote) {
                             fStart = start;
                             fEnd = p;
                             start = p + 1;
-                            state = State.EOR;
+                            state = ch == '\n' ? State.EOR : State.CR;
                             lastDelimiterPosition = p;
                             return true;
-                        } else if (startedQuote && lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
+                        } else if (lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
                                 && quoteCount == doubleQuoteCount * 2 + 2) {
                             // set the position of fStart to +1, fEnd to -1 to remove quote character
                             fStart = start + 1;
                             fEnd = p - 1;
                             lastDelimiterPosition = p;
                             start = p + 1;
-                            state = State.EOR;
-                            startedQuote = false;
-                            return true;
-                        }
-                    } else if (ch == '\r') {
-                        if (!startedQuote) {
-                            fStart = start;
-                            fEnd = p;
-                            start = p + 1;
-                            state = State.CR;
-                            lastDelimiterPosition = p;
-                            return true;
-                        } else if (startedQuote && lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
-                                && quoteCount == doubleQuoteCount * 2 + 2) {
-                            // set the position of fStart to +1, fEnd to -1 to remove quote character
-                            fStart = start + 1;
-                            fEnd = p - 1;
-                            lastDelimiterPosition = p;
-                            start = p + 1;
-                            state = State.CR;
+                            state = ch == '\n' ? State.EOR : State.CR;
                             startedQuote = false;
                             return true;
                         }
@@ -330,7 +351,10 @@
         throw new IllegalStateException();
     }
 
-    protected boolean readMore() throws IOException {
+    private boolean readMore() throws IOException {
+        if (in == null) {
+            return false;
+        }
         if (start > 0) {
             System.arraycopy(buffer, start, buffer, 0, end - start);
         }
@@ -350,10 +374,11 @@
     }
 
     // Eliminate escaped double quotes("") in a field
-    public void eliminateDoubleQuote(char[] buffer, int start, int length) {
-        int lastDoubleQuotePosition = -99;
-        int writepos = start;
-        int readpos = start;
+    public void eliminateDoubleQuote() {
+        int lastDoubleQuotePosition = -1;
+        int writepos = fStart;
+        int readpos = fStart;
+        int length = fEnd - fStart;
         // Find positions where double quotes appear
         for (int i = 0; i < length; i++) {
             // Skip double quotes
@@ -369,5 +394,7 @@
                 readpos++;
             }
         }
+        fEnd -= doubleQuoteCount;
+        isDoubleQuoteIncludedInThisField = false;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/file/CursorTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/file/CursorTest.java
index e663179..8edcafc 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/file/CursorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/file/CursorTest.java
@@ -51,9 +51,8 @@
             while (cursor.nextRecord()) {
                 int fieldNumber = 0;
                 while (cursor.nextField()) {
-                    if (cursor.isDoubleQuoteIncludedInThisField) {
-                        cursor.eliminateDoubleQuote(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
-                        cursor.fEnd -= cursor.doubleQuoteCount;
+                    if (cursor.fieldHasDoubleQuote()) {
+                        cursor.eliminateDoubleQuote();
                     }
                     fieldNumber++;
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 741ca8c..ee19de3 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -75,14 +75,14 @@
 
     @GuardedBy("ChannelControlBlock")
     private boolean computeWritability() {
-        boolean writableDataPresent = currentWriteBuffer != null || !wiFullQueue.isEmpty();
+        if (!ecodeSent && ecode.get() == REMOTE_ERROR_CODE) {
+            return true;
+        }
+        boolean writableDataPresent = !ecodeSent && (currentWriteBuffer != null || !wiFullQueue.isEmpty());
         if (writableDataPresent) {
             return credits > 0;
         }
-        if (isPendingCloseWrite()) {
-            return true;
-        }
-        return ecode.get() == REMOTE_ERROR_CODE && !ecodeSent;
+        return isPendingCloseWrite();
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
index a7be3a6..e542a34 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
@@ -41,7 +41,15 @@
         if (currentWriteBuffer == null) {
             currentWriteBuffer = wiFullQueue.poll();
         }
-        if (currentWriteBuffer != null) {
+        if (!ecodeSent && ecode.get() == REMOTE_ERROR_CODE) {
+            writerState.getCommand().setChannelId(channelId);
+            writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR);
+            writerState.getCommand().setData(ecode.get());
+            writerState.reset(null, 0, null);
+            ecodeSent = true;
+            ccb.reportLocalEOS();
+            adjustChannelWritability();
+        } else if (currentWriteBuffer != null) {
             int size = Math.min(currentWriteBuffer.remaining(), credits);
             if (size > 0) {
                 credits -= size;
@@ -55,14 +63,6 @@
             } else {
                 adjustChannelWritability();
             }
-        } else if (ecode.get() == REMOTE_ERROR_CODE && !ecodeSent) {
-            writerState.getCommand().setChannelId(channelId);
-            writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR);
-            writerState.getCommand().setData(REMOTE_ERROR_CODE);
-            writerState.reset(null, 0, null);
-            ecodeSent = true;
-            ccb.reportLocalEOS();
-            adjustChannelWritability();
         } else if (isPendingCloseWrite()) {
             writerState.getCommand().setChannelId(channelId);
             writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 5e922b1..81e528b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -288,13 +288,17 @@
             // Unlatch and unpin pages that weren't in the queue to avoid leaking memory.
             compressedPageWriter.abort();
             for (NodeFrontier nodeFrontier : nodeFrontiers) {
-                ICachedPage frontierPage = nodeFrontier.page;
-                if (frontierPage.confiscated()) {
-                    bufferCache.returnPage(frontierPage, false);
+                if (nodeFrontier != null && nodeFrontier.page != null) {
+                    ICachedPage frontierPage = nodeFrontier.page;
+                    if (frontierPage.confiscated()) {
+                        bufferCache.returnPage(frontierPage, false);
+                    }
                 }
             }
             for (ICachedPage pageToDiscard : pagesToWrite) {
-                bufferCache.returnPage(pageToDiscard, false);
+                if (pageToDiscard != null) {
+                    bufferCache.returnPage(pageToDiscard, false);
+                }
             }
             releasedLatches = true;
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java
index 9217415..3226786 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java
@@ -83,22 +83,21 @@
     public void prepareWrite(ICachedPage cPage) throws HyracksDataException {
         final ICachedPageInternal internalPage = (ICachedPageInternal) cPage;
         final int entryPageId = getLAFEntryPageId(BufferedFileHandle.getPageId(internalPage.getDiskPageId()));
-
-        if (!cachedFrames.containsKey(entryPageId)) {
-            try {
-                //Writing new page(s). Confiscate the page(s) from the buffer cache.
-                prepareFrames(entryPageId, internalPage);
-            } catch (HyracksDataException e) {
-                abort();
-                throw e;
-            }
+        try {
+            //Writing new page(s). Confiscate the page(s) from the buffer cache.
+            prepareFrames(entryPageId, internalPage);
+        } catch (HyracksDataException e) {
+            abort();
+            throw e;
         }
     }
 
     private void prepareFrames(int entryPageId, ICachedPageInternal cPage) throws HyracksDataException {
-        //Confiscate the first page
-        confiscatePage(entryPageId);
-        //check if extra pages spans to the next entry page
+        // check if we need to confiscate a page for the main page
+        if (!cachedFrames.containsKey(entryPageId)) {
+            confiscatePage(entryPageId);
+        }
+        // check if extra pages span to the next entry page
         for (int i = 0; i < cPage.getFrameSizeMultiplier() - 1; i++) {
             final int extraEntryPageId = getLAFEntryPageId(cPage.getExtraBlockPageId() + i);
             if (!cachedFrames.containsKey(extraEntryPageId)) {
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index e44480c..3f78234 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -51,6 +51,7 @@
 
 public class TestUtils {
 
+    private static final int DEFAULT_FRAME_SIZE = 32768;
     public static final IWarningCollector NOOP_WARNING_COLLECTOR = new IWarningCollector() {
         @Override
         public void warn(Warning warning) {
@@ -68,6 +69,10 @@
         }
     };
 
+    public static IHyracksTaskContext createHyracksTask() {
+        return create(DEFAULT_FRAME_SIZE);
+    }
+
     public static IHyracksTaskContext create(int frameSize) {
         IOManager ioManager = null;
         try {
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
index 1fcd806..65567b3 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
@@ -115,6 +115,17 @@
         return hosts;
     }
 
+    public static String defaultPort(String maybeHostPort, int defaultPort) {
+        String encodedInput = encodeIPv6LiteralHost(maybeHostPort);
+        int lastColon = encodedInput.lastIndexOf(':');
+        int closeBracket = encodedInput.lastIndexOf(']');
+        if (lastColon > 0 && lastColon > closeBracket) {
+            return maybeHostPort;
+        } else {
+            return encodedInput + ":" + defaultPort;
+        }
+    }
+
     public static String encodeIPv6LiteralHost(String hostname) {
         return InetAddressUtils.isIPv6Address(hostname) ? "[" + hostname + "]" : hostname;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/test/java/org/apache/hyracks/util/NetworkUtilTest.java b/hyracks-fullstack/hyracks/hyracks-util/src/test/java/org/apache/hyracks/util/NetworkUtilTest.java
new file mode 100644
index 0000000..c5d42c5
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/test/java/org/apache/hyracks/util/NetworkUtilTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NetworkUtilTest {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    @Test
+    public void testDefaultPort() {
+        Assert.assertEquals("127.0.0.1:1234", NetworkUtil.defaultPort("127.0.0.1:1234", 9999));
+        Assert.assertEquals("127.0.0.1:9999", NetworkUtil.defaultPort("127.0.0.1", 9999));
+        Assert.assertEquals("[::1]:1234", NetworkUtil.defaultPort("[::1]:1234", 9999));
+        Assert.assertEquals("[::1]:9999", NetworkUtil.defaultPort("::1", 9999));
+        Assert.assertEquals("localhost.localdomain.local:9999",
+                NetworkUtil.defaultPort("localhost.localdomain.local", 9999));
+        Assert.assertEquals("localhost.localdomain.local:1234",
+                NetworkUtil.defaultPort("localhost.localdomain.local:1234", 9999));
+
+    }
+}
diff --git a/hyracks-fullstack/pom.xml b/hyracks-fullstack/pom.xml
index bb57cd7..f10f2fa 100644
--- a/hyracks-fullstack/pom.xml
+++ b/hyracks-fullstack/pom.xml
@@ -82,7 +82,7 @@
       <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
-        <version>4.1.46.Final</version>
+        <version>4.1.48.Final</version>
       </dependency>
       <dependency>
         <groupId>junit</groupId>
@@ -277,6 +277,31 @@
         <artifactId>maven-plugin-api</artifactId>
         <version>3.6.3</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.maven</groupId>
+        <artifactId>maven-model</artifactId>
+        <version>3.6.3</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.maven</groupId>
+        <artifactId>maven-artifact</artifactId>
+        <version>3.6.3</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.maven</groupId>
+        <artifactId>maven-compat</artifactId>
+        <version>3.6.3</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.maven</groupId>
+        <artifactId>maven-core</artifactId>
+        <version>3.6.3</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.maven</groupId>
+        <artifactId>maven-plugin-api</artifactId>
+        <version>3.6.3</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
   <build>