Merge "Merge commit 'f1cd417' from 'gerrit/mad-hatter'"
diff --git a/asterixdb/asterix-app/data/csv/01.csv b/asterixdb/asterix-app/data/csv/01.csv
new file mode 100644
index 0000000..6957e76
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/01.csv
@@ -0,0 +1,3 @@
+1,,"good","recommend"
+2,,"bad","not recommend"
+3,,"good",
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/02.csv b/asterixdb/asterix-app/data/csv/02.csv
new file mode 100644
index 0000000..630843f
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/02.csv
@@ -0,0 +1,3 @@
+4,2018,"good","recommend"
+5,2018,,"not recommend"
+6,2018,"good",
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/empty.csv b/asterixdb/asterix-app/data/csv/empty.csv
new file mode 100644
index 0000000..3f2ff2d
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/empty.csv
@@ -0,0 +1,5 @@
+
+
+
+
+
diff --git a/asterixdb/asterix-app/data/csv/sample_09.csv b/asterixdb/asterix-app/data/csv/sample_09.csv
new file mode 100644
index 0000000..b14219d
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/sample_09.csv
@@ -0,0 +1,17 @@
+a,b,c,d,e
+0,", boo", 1,2,3
+1,"","",❤,
+2,3,4,\n,
+3,"quoted ""f"" field",,,
+4,4,,,
+5,"{""vehicle"": ""car"", ""location"": [2.0, 0.1]}",,,
+6,2,3,,
+7,8,9,,
+8,2,3,,
+9,8,9,,
+10,"field
+""f""
+with multiple lines",,,
+11,4,,,
+12,5,ʤ,,
+John,Green,111 downtown st.,"city, state",99999
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/sample_10.csv b/asterixdb/asterix-app/data/csv/sample_10.csv
new file mode 100644
index 0000000..3beee08
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/sample_10.csv
@@ -0,0 +1,39 @@
+1,"?/ Text ending with a backslash / \",2000-09-03 07:12:22
+2,non quoted text!yes......,2003-08-09 22:34:19
+3,Text with more sentences. Another sentence.,2003-09-12 05:29:12
+4,"Quoted text.. yes.",2003-09-13 17:21:49
+5,Another text,2003-01-21 23:31:41
+6,Text with' quotes.,2003-09-14 20:15:50
+7,Text with quote's,2003-09-14 18:34:03
+8,"Text with quotes '",2003-01-28 20:32:13
+9,"Text with quotes """,2003-01-18 11:44:15
+10,Text with question marks!?!?,2003-09-18 06:25:56
+11,""" Text that starts with quotes",2003-09-12 00:31:24
+12,"Text with \"" backslash and quotes",2003-09-13 20:30:06
+13,"Text with \"" backslash and quotes\""",2003-09-14 16:20:36
+14,"Text that has comma ,",2003-09-12 08:21:18
+15,"Text that has "","" quoted comma",2003-09-12 08:21:18
+16,",Text that has ",2003-09-12 08:21:18
+17,","",Text that has ",2003-09-12 08:21:18
+18,"Text with commas,inside it., yes",2003-09-13 23:42:14
+19,"Text that has \n inside ",2003-09-12 08:21:18
+20,"Text that has \\\n inside ",2003-09-12 08:21:18
+21,text with :),2003-09-05 19:15:34
+22,"Text that has \\\"" inside \\",2003-09-12 08:21:18
+23,"Text that has \\\"" inside \\""",2003-09-12 08:21:18
+24,"""text that spans multiple
+Lines and more
+Lines ane more and more
+Lines ...
+And yet more lines
+And more""",2011-09-19 01:09:09
+25,"Text ""
+more lines",2011-09-19 01:09:09
+26,"""
+",2011-09-19 01:09:09
+27,"Text",""
+28,"Text","2011-09-19 01:09:09"
+29,"Text\.","2011-09-19 01:09:09"
+30,Text\.,"2011-09-19 01:09:09"
+31,"\.Text","2011-09-19 01:09:09"
+32,\.Text,"2011-09-19 01:09:09"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/sample_11.csv b/asterixdb/asterix-app/data/csv/sample_11.csv
new file mode 100644
index 0000000..b9a9571
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/sample_11.csv
@@ -0,0 +1,4 @@
+1,","", b", 3,4,5
+","", b",4, 3,4,5
+,,,,
+"dd",,,,
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/sample_12.csv b/asterixdb/asterix-app/data/csv/sample_12.csv
new file mode 100644
index 0000000..0c9baf5
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/sample_12.csv
@@ -0,0 +1,16 @@
+f1,f2,f3
+1,true,"text"
+2,false,"text"
+3,true,"text"
+4,true,""
+5,false,
+6,true,"text""
+more lines"
+7,false,"""
+"
+8,true,""
+9,false,"text"""
+10,false,text\.
+11,true,"text\."
+,false,\.text
+13,true,"\.text"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/sample_13.csv b/asterixdb/asterix-app/data/csv/sample_13.csv
new file mode 100644
index 0000000..9f53f56
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/sample_13.csv
@@ -0,0 +1,11 @@
+
+
+f1,f2,f3,f4
+
+1,,"good","recommend"
+
+2,,"bad","not recommend"
+3,,"good",
+
+
+
diff --git a/asterixdb/asterix-app/data/tsv/01.tsv b/asterixdb/asterix-app/data/tsv/01.tsv
new file mode 100644
index 0000000..98876c7
--- /dev/null
+++ b/asterixdb/asterix-app/data/tsv/01.tsv
@@ -0,0 +1,3 @@
+1 "good" "recommend"
+2 "bad" "not recommend"
+3 "good" "recommend"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/tsv/02.tsv b/asterixdb/asterix-app/data/tsv/02.tsv
new file mode 100644
index 0000000..c01ce7c
--- /dev/null
+++ b/asterixdb/asterix-app/data/tsv/02.tsv
@@ -0,0 +1,3 @@
+4 2018 "good" "recommend"
+5 2018 "not recommend"
+6 2018 "good" "recommend"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/tsv/sample_01.tsv b/asterixdb/asterix-app/data/tsv/sample_01.tsv
new file mode 100644
index 0000000..aab289a
--- /dev/null
+++ b/asterixdb/asterix-app/data/tsv/sample_01.tsv
@@ -0,0 +1,28 @@
+11 55 text field wih , charrrrrrrrrrr true 90 0.666666667
+12 55 text field with " charrrrrrrrrr false 90 0.666666667
+14 55 text field with ' charrrrrrrrrr false 90 0.666666667
+15 55 text field with \ charrrrrrrrrr false 90 0.666666667
+16 55 text field wih \, char true 90 0.666666667
+17 55 text field with \" charrrrrrrrr false 90 0.666666667
+18 55 text field with \' charrrrrrrrr false 90 0.666666667
+19 55 text field with \\ charrrrrrrrr false 90 0.666666667
+20 55 text field ending with charr , false 90 0.666666667
+21 55 text field ending with charr " false 90 0.666666667
+22 55 text field ending with charr ' false 90 0.666666667
+23 55 text field ending with charr \ false 90 0.666666667
+24 55 text field ending with charr \, false 90 0.666666667
+25 55 text field ending with charr \" false 90 0.666666667
+26 55 text field ending with charr \' false 90 0.666666667
+27 55 text field ending with charr \\ false 90 0.666666667
+28 55 ,text field starting with charr false 90 0.666666667
+29 55 "text field starting with charr false 90 0.666666667
+30 55 'text field starting with charr false 90 0.666666667
+31 55 \text field starting with charr false 90 0.666666667
+32 55 \,text field starting with char false 90 0.666666667
+33 55 \"text field starting with char false 90 0.666666667
+34 55 \'text field starting with char false 90 0.666666667
+35 55 \\text field starting with char false 90 0.666666667
+36 55 "text field inside with char" false 90 0.666666667
+37 55 text field with charrrrrrrrr false 90 0.666666667
+38 55 text field with "" charrrrrrrrr false 90 0.666666667
+39 55 text field "with" charrrrrrrrrr false 90 0.666666667
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f83e21e..ea2ce40 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -93,6 +93,7 @@
import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.formats.nontagged.TypeTraitProvider;
import org.apache.asterix.lang.common.base.IReturningStatement;
import org.apache.asterix.lang.common.base.IRewriterFactory;
@@ -677,6 +678,7 @@
itemTypeDataverseName + "." + itemTypeName);
}
itemType = itemTypeEntity.getDatatype();
+ validateDatasetItemType(dsType, itemType, false, sourceLoc);
break;
case RECORD:
itemTypeDataverseName = dataverseName;
@@ -684,6 +686,7 @@
lockUtil.createTypeBegin(lockManager, metadataProvider.getLocks(), itemTypeDataverseName,
itemTypeName);
itemType = translateType(itemTypeDataverseName, itemTypeName, itemTypeExpr, mdTxnCtx);
+ validateDatasetItemType(dsType, itemType, false, sourceLoc);
MetadataManager.INSTANCE.addDatatype(mdTxnCtx,
new Datatype(itemTypeDataverseName, itemTypeName, itemType, true));
break;
@@ -701,13 +704,8 @@
} else {
validateCompactionPolicy(compactionPolicy, compactionPolicyProperties, mdTxnCtx, false, sourceLoc);
}
- switch (dd.getDatasetType()) {
+ switch (dsType) {
case INTERNAL:
- if (itemType.getTypeTag() != ATypeTag.OBJECT) {
- throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
- "Dataset type has to be a record type.");
- }
-
IAType metaItemType = null;
if (metaItemTypeExpr != null) {
switch (metaItemTypeExpr.getTypeKind()) {
@@ -720,10 +718,7 @@
metaItemTypeDataverseName + "." + metaItemTypeName);
}
metaItemType = metaItemTypeEntity.getDatatype();
- if (metaItemType.getTypeTag() != ATypeTag.OBJECT) {
- throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
- "Dataset meta type has to be a record type.");
- }
+ validateDatasetItemType(dsType, metaItemType, true, sourceLoc);
break;
case RECORD:
metaItemTypeDataverseName = dataverseName;
@@ -732,6 +727,7 @@
metaItemTypeDataverseName, metaItemTypeName);
metaItemType = translateType(metaItemTypeDataverseName, metaItemTypeName,
metaItemTypeExpr, mdTxnCtx);
+ validateDatasetItemType(dsType, metaItemType, true, sourceLoc);
MetadataManager.INSTANCE.addDatatype(mdTxnCtx,
new Datatype(metaItemTypeDataverseName, metaItemTypeName, metaItemType, true));
break;
@@ -769,12 +765,15 @@
case EXTERNAL:
ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl();
Map<String, String> properties = createExternalDatasetProperties(dd, metadataProvider, mdTxnCtx);
+ ExternalDataUtils.normalize(properties);
+ ExternalDataUtils.validate(properties);
+ validateExternalDatasetDetails(externalDetails, properties);
datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
TransactionState.COMMIT);
break;
default:
throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
- "Unknown dataset type " + dd.getDatasetType());
+ "Unknown dataset type " + dsType);
}
// #. initialize DatasetIdFactory if it is not initialized.
@@ -788,7 +787,7 @@
datasetDetails, dd.getHints(), dsType, DatasetIdFactory.generateDatasetId(),
MetadataUtil.PENDING_ADD_OP, compressionScheme);
MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
- if (dd.getDatasetType() == DatasetType.INTERNAL) {
+ if (dsType == DatasetType.INTERNAL) {
JobSpecification jobSpec = DatasetUtil.createDatasetJobSpec(dataset, metadataProvider);
// #. make metadataTxn commit before calling runJob.
@@ -857,6 +856,14 @@
}
}
+ protected void validateDatasetItemType(DatasetType datasetType, IAType itemType, boolean isMetaItemType,
+ SourceLocation sourceLoc) throws AlgebricksException {
+ if (itemType.getTypeTag() != ATypeTag.OBJECT) {
+ throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
+ String.format("Dataset %s has to be a record type.", isMetaItemType ? "meta type" : "type"));
+ }
+ }
+
protected Map<String, String> createExternalDatasetProperties(DatasetDecl dd, MetadataProvider metadataProvider,
MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl();
@@ -2218,9 +2225,11 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
lockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
try {
- CompiledLoadFromFileStatement cls =
- new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName(), loadStmt.getAdapter(),
- loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
+ Map<String, String> properties = loadStmt.getProperties();
+ ExternalDataUtils.normalize(properties);
+ ExternalDataUtils.validate(properties);
+ CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName,
+ loadStmt.getDatasetName(), loadStmt.getAdapter(), properties, loadStmt.dataIsAlreadySorted());
cls.setSourceLocation(stmt.getSourceLocation());
JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls,
null, responsePrinter, warningCollector);
@@ -2417,7 +2426,10 @@
"A feed with this name " + feedName + " already exists.");
}
}
- feed = new Feed(dataverseName, feedName, cfs.getConfiguration());
+ Map<String, String> configuration = cfs.getConfiguration();
+ ExternalDataUtils.normalize(configuration);
+ ExternalDataUtils.validate(configuration);
+ feed = new Feed(dataverseName, feedName, configuration);
FeedMetadataUtil.validateFeed(feed, mdTxnCtx, appCtx);
MetadataManager.INSTANCE.addFeed(metadataProvider.getMetadataTxnContext(), feed);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -3426,4 +3438,14 @@
throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, clientRequest.getId());
}
}
+
+ protected void validateExternalDatasetDetails(ExternalDetailsDecl externalDetails, Map<String, String> properties)
+ throws RuntimeDataException {
+ String adapter = externalDetails.getAdapter();
+ // "format" parameter is needed for "S3" data source
+ if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(adapter)
+ && properties.get(ExternalDataConstants.KEY_FORMAT) == null) {
+ throw new RuntimeDataException(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_FORMAT);
+ }
+ }
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
index 3b4cdf8..d2158ba 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
@@ -18,9 +18,12 @@
*/
package org.apache.asterix.test.external_dataset.aws;
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
+import java.nio.file.Paths;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -66,9 +69,13 @@
private static S3Client client;
private static final String S3_MOCK_SERVER_BUCKET = "playground";
private static final String S3_MOCK_SERVER_BUCKET_DEFINITION = "json-data/reviews/"; // data resides here
+ private static final String S3_MOCK_SERVER_BUCKET_CSV_DEFINITION = "csv-data/reviews/"; // data resides here
+ private static final String S3_MOCK_SERVER_BUCKET_TSV_DEFINITION = "tsv-data/reviews/"; // data resides here
private static final String S3_MOCK_SERVER_REGION = "us-west-2";
private static final int S3_MOCK_SERVER_PORT = 8001;
private static final String S3_MOCK_SERVER_HOSTNAME = "http://localhost:" + S3_MOCK_SERVER_PORT;
+ private static final String CSV_DATA_PATH = joinPath("data", "csv");
+ private static final String TSV_DATA_PATH = joinPath("data", "tsv");
@BeforeClass
public static void setUp() throws Exception {
@@ -210,6 +217,26 @@
PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
.key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/q2/2.json").build(),
RequestBody.fromString("{\"id\": 14, \"year\": 2019, \"quarter\": 2, \"review\": \"bad\"}"));
+
+ LOGGER.info("Adding CSV files to the bucket");
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_CSV_DEFINITION + "01.csv").build(),
+ RequestBody.fromFile(Paths.get(CSV_DATA_PATH, "01.csv")));
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_CSV_DEFINITION + "2018/01.csv").build(),
+ RequestBody.fromFile(Paths.get(CSV_DATA_PATH, "02.csv")));
+
+ LOGGER.info("Adding TSV files to the bucket");
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_TSV_DEFINITION + "01.tsv").build(),
+ RequestBody.fromFile(Paths.get(TSV_DATA_PATH, "01.tsv")));
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_TSV_DEFINITION + "2018/01.tsv").build(),
+ RequestBody.fromFile(Paths.get(TSV_DATA_PATH, "02.tsv")));
LOGGER.info("Files added successfully");
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java
new file mode 100644
index 0000000..3a4823f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ResultStreamingFailureTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.hyracks.control.nc.result.ResultPartitionReader;
+import org.apache.hyracks.util.Span;
+import org.apache.hyracks.util.ThreadDumpUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ResultStreamingFailureTest {
+
+ private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
+ @Before
+ public void setUp() throws Exception {
+ integrationUtil.init(true, AsterixHyracksIntegrationUtil.DEFAULT_CONF_FILE);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ integrationUtil.deinit(true);
+ }
+
+ @Test
+ public void resultStreamingFailureTest() throws Exception {
+ queryAndDropConnection();
+ // allow result sender to terminate and ensure no leaks
+ Span timeout = Span.start(5, TimeUnit.SECONDS);
+ while (!timeout.elapsed()) {
+ String threadDump = ThreadDumpUtil.takeDumpString();
+ if (!threadDump.contains(ResultPartitionReader.class.getName())) {
+ return;
+ }
+ TimeUnit.SECONDS.sleep(1);
+ }
+ throw new AssertionError("found leaking senders in:\n" + ThreadDumpUtil.takeDumpString());
+ }
+
+ private void queryAndDropConnection() throws IOException {
+ try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+ final List<NameValuePair> params = new ArrayList<>();
+ params.add(new BasicNameValuePair("statement", "select * from range(1, 10000000) r;"));
+ HttpPost request = new HttpPost("http://localhost:19004/query/service");
+ request.setEntity(new UrlEncodedFormEntity(params, StandardCharsets.UTF_8));
+ CloseableHttpResponse response = httpClient.execute(request);
+ Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+ // close connection without streaming the result
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp
new file mode 100644
index 0000000..f7fe18c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE TYPE t1 AS {f1: string, f2: string, f3: string, f4: string, f5: string};
+CREATE TYPE t2 AS {f1: string, f2: string, f3: string};
+CREATE TYPE t3 AS {f1: int?, f2: boolean, f3: string?};
+CREATE TYPE t4 AS {f1: string, f2: string, f3: string, f4: string};
+
+CREATE EXTERNAL DATASET ds1(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_09.csv"), ("format"="CSV"), ("header"="FALSE"));
+CREATE EXTERNAL DATASET ds2(t2) USING localfs(("path"="asterix_nc1://data/csv/sample_10.csv"), ("format"="Csv"), ("header"="False"));
+CREATE EXTERNAL DATASET ds3(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_11.csv"), ("format"="csv"), ("header"="FALSE"));
+CREATE EXTERNAL DATASET ds4(t3) USING localfs(("path"="asterix_nc1://data/csv/sample_12.csv"), ("format"="csv"), ("header"="True"));
+CREATE EXTERNAL DATASET ds5(t4) USING localfs(("path"="asterix_nc1://data/csv/sample_13.csv"), ("format"="csv"), ("header"="True"));
+CREATE EXTERNAL DATASET ds6(t4) USING localfs(("path"="asterix_nc1://data/csv/empty.csv"), ("format"="csv"), ("header"="false"));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.2.query.sqlpp
new file mode 100644
index 0000000..d870372
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.2.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds1 v SELECT VALUE v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.3.query.sqlpp
new file mode 100644
index 0000000..64a2f8a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.3.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds2 v SELECT VALUE v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.4.query.sqlpp
new file mode 100644
index 0000000..313198c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.4.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds3 v SELECT VALUE v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.5.query.sqlpp
new file mode 100644
index 0000000..065de4e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.5.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds4 v SELECT VALUE v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.query.sqlpp
new file mode 100644
index 0000000..a3d113d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds5 v SELECT VALUE v ORDER BY v.f1;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.7.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.7.query.sqlpp
new file mode 100644
index 0000000..2e5b312
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.7.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds6 v SELECT VALUE v ORDER BY v.f1;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.8.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.8.ddl.sqlpp
new file mode 100644
index 0000000..86a1b59
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.8.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp
new file mode 100644
index 0000000..cabe54b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE TYPE t1 AS {f1: int, f2: int, f3: string, f4: boolean, f5: bigint, f6: double};
+
+CREATE EXTERNAL DATASET ds1(t1) USING localfs(("path"="asterix_nc1://data/tsv/sample_01.tsv"), ("format"="tsv"), ("header"="FALSE"));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.2.query.sqlpp
new file mode 100644
index 0000000..d870372
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.2.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds1 v SELECT VALUE v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.3.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.3.ddl.sqlpp
new file mode 100644
index 0000000..86a1b59
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.3.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp
new file mode 100644
index 0000000..6184b19
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+DROP TYPE test IF EXISTS;
+CREATE TYPE test AS {id: int, year: int?, review: string, details: string?};
+
+DROP DATASET test IF EXISTS;
+CREATE EXTERNAL DATASET test(test) USING S3 (
+("accessKey"="dummyAccessKey"),
+("secretKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="csv-data/reviews"),
+("format"="Csv"),
+("header"="false")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.002.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.002.query.sqlpp
new file mode 100644
index 0000000..6e31eb3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.002.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM test SELECT VALUE test ORDER BY id ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.003.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.003.ddl.sqlpp
new file mode 100644
index 0000000..0ff713d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.003.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATASET test IF EXISTS;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp
new file mode 100644
index 0000000..194adf6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+DROP TYPE test IF EXISTS;
+CREATE TYPE test AS {id: int, year: int?, review: string, details: string?};
+
+DROP DATASET test IF EXISTS;
+CREATE EXTERNAL DATASET test(test) USING S3 (
+("accessKey"="dummyAccessKey"),
+("secretKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="tsv-data/reviews"),
+("format"="TSV"),
+("header"="False")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.002.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.002.query.sqlpp
new file mode 100644
index 0000000..6e31eb3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.002.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM test SELECT VALUE test ORDER BY id ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.003.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.003.ddl.sqlpp
new file mode 100644
index 0000000..0ff713d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.003.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATASET test IF EXISTS;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/negative/negative.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/negative/negative.000.ddl.sqlpp
new file mode 100644
index 0000000..e0fc056
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/negative/negative.000.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// "format" parameter is missing for S3
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+DROP TYPE test IF EXISTS;
+CREATE TYPE test AS {id: int, year: int?, review: string, details: string?};
+
+DROP DATASET test IF EXISTS;
+CREATE EXTERNAL DATASET test(test) USING S3 (
+("accessKey"="dummyAccessKey"),
+("secretKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="tsv-data/reviews")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/list/get-item_03/get-item_03.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/list/get-item_03/get-item_03.4.query.sqlpp
new file mode 100644
index 0000000..ab27331
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/list/get-item_03/get-item_03.4.query.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+{
+"ta1": [1, 2, 3][-1],
+"ta2": [1, 2, 3][-2],
+"ta3": [1, 2, 3][-3],
+"ta4": [1, 2, 3][-4] is missing,
+"tb1": [1, 2, 3][-((select value id from test where id = 1)[0])],
+"tb2": [1, 2, 3][-((select value id from test where id = 1)[0])-1],
+"tb3": [1, 2, 3][-((select value id from test where id = 1)[0])-2],
+"tb4": [1, 2, 3][-((select value id from test where id = 1)[0])-3] is missing
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_01/case_01.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_01/case_01.1.query.sqlpp
index c2a4cea..7f9647b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_01/case_01.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_01/case_01.1.query.sqlpp
@@ -24,5 +24,5 @@
WHEN MISSING THEN -2
ELSE 2.0/t
END
-FROM [0, 1, 2, 4, NULL, [0][-1]] t;
+FROM [0, 1, 2, 4, NULL, MISSING] t;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_02/case_02.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_02/case_02.1.query.sqlpp
index ab19bcb..095eb20 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_02/case_02.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_02/case_02.1.query.sqlpp
@@ -24,6 +24,6 @@
WHEN t IS MISSING THEN -2
ELSE 2.0/t
END
-FROM [0, 1, 2, 4, NULL, [0][-1]] t;
+FROM [0, 1, 2, 4, NULL, MISSING] t;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_03/case_03.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_03/case_03.1.query.sqlpp
index 7c3e566..12cd3f6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_03/case_03.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_03/case_03.1.query.sqlpp
@@ -24,5 +24,5 @@
WHEN t IS MISSING THEN (SELECT -2 AS r)
ELSE (SELECT -3 AS r)
END
-FROM [0, 1, 2, 4, NULL, [0][-1]] t;
+FROM [0, 1, 2, 4, NULL, MISSING] t;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_04/case_04.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_04/case_04.1.query.sqlpp
index 154e611..e99da85 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_04/case_04.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_04/case_04.1.query.sqlpp
@@ -24,5 +24,5 @@
WHEN t IS MISSING THEN (SELECT -2)
ELSE 2.0/t
END
-FROM [0, 1, 2, 4, NULL, [0][-1]] t;
+FROM [0, 1, 2, 4, NULL, MISSING] t;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_05/case_05.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_05/case_05.1.query.sqlpp
index 32030a7..5940b19 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_05/case_05.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_05/case_05.1.query.sqlpp
@@ -26,5 +26,5 @@
WHEN t=2 THEN MISSING
ELSE 2.0/t
END
-FROM [0, 1, 2, 4, NULL, [0][-1]] t;
+FROM [0, 1, 2, 4, NULL, MISSING] t;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_06/case_06.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_06/case_06.1.query.sqlpp
index 5bf2786..8fbaa7a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_06/case_06.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_06/case_06.1.query.sqlpp
@@ -22,5 +22,5 @@
WHEN t = 0 THEN MISSING
ELSE NULL
END
-FROM [0, 1, 2, 4, NULL, [0][-1]] t;
+FROM [0, 1, 2, 4, NULL, MISSING] t;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_07/case_07.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_07/case_07.1.query.sqlpp
index 4850047..27180b5 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_07/case_07.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/case_07/case_07.1.query.sqlpp
@@ -27,4 +27,4 @@
WHEN t IS UNKNOWN THEN (SELECT -3) // Should never enter this THEN branch.
ELSE 2.0/t
END
-FROM [0, 1, 2, 4, NULL, [0][-1]] t;
+FROM [0, 1, 2, 4, NULL, MISSING] t;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/numeric/add_double/add_double.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/numeric/add_double/add_double.1.query.sqlpp
index 366f2b1..a5b1e1b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/numeric/add_double/add_double.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/numeric/add_double/add_double.1.query.sqlpp
@@ -18,4 +18,4 @@
*/
-{'result1':(double('-6.5d') + tinyint('+1')),'result2':(double('-6.5d') + smallint('2')),'result3':(double('-6.5d') + integer('+3')),'result4':(double('-6.5d') + bigint('-4')),'result5':(double('-6.5d') + float('-5.5f')),'result6':(double('-6.5d') + double('-6.5d')),'result7':(double('-6.5d') + null), 'result8':double('-6.5d') + [1.0][-1]};
+{'result1':(double('-6.5d') + tinyint('+1')),'result2':(double('-6.5d') + smallint('2')),'result3':(double('-6.5d') + integer('+3')),'result4':(double('-6.5d') + bigint('-4')),'result5':(double('-6.5d') + float('-5.5f')),'result6':(double('-6.5d') + double('-6.5d')),'result7':(double('-6.5d') + null), 'result8':double('-6.5d') + MISSING};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.1.ddl.sqlpp
new file mode 100644
index 0000000..d881c98
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.1.ddl.sqlpp
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test various combinations of grouping sets
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type tenkType as closed {
+ unique1 : integer,
+ unique2 : integer,
+ two : integer,
+ four : integer,
+ ten : integer,
+ twenty : integer,
+ hundred : integer,
+ thousand : integer,
+ twothousand : integer,
+ fivethous : integer,
+ tenthous : integer,
+ odd100 : integer,
+ even100 : integer,
+ stringu1 : string,
+ stringu2 : string,
+ string4 : string
+};
+
+create dataset tenk(tenkType) primary key unique2;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.2.update.sqlpp
new file mode 100644
index 0000000..7128175
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.2.update.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+load dataset tenk using localfs ((`path`=`asterix_nc1://data/tenk.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.3.query.sqlpp
new file mode 100644
index 0000000..ddd69bd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/union/union_nested/union_nested.3.query.sqlpp
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+select two, four, ten, twenty,
+ sum(unique1) as agg_sum
+from tenk
+group by two, four, ten, twenty
+
+union all
+select two, four, ten, twenty,
+ sum(unique1) as agg_sum
+from tenk
+group by -two as two, -four as four, -ten as ten, -twenty as twenty
+
+union all
+select two, four, ten, twenty,
+ sum(unique1) as agg_sum
+from tenk
+group by tostring(two) as two, tostring(four) as four, tostring(ten) as ten, tostring(twenty) as twenty
+
+union all
+select two, four, ten, null as twenty,
+ sum(unique1) as agg_sum
+from tenk
+group by two, four, ten
+
+union all
+select two, four, ten, null as twenty,
+ sum(unique1) as agg_sum
+from tenk
+group by -two as two, -four as four, -ten as ten
+
+union all
+select two, four, ten, null as twenty,
+ sum(unique1) as agg_sum
+from tenk
+group by tostring(two) as two, tostring(four) as four, tostring(ten) as ten
+
+union all
+select two, four, null as ten, null as twenty,
+ sum(unique1) as agg_sum
+from tenk
+group by two, four
+
+union all
+select two, four, null as ten, null as twenty,
+ sum(unique1) as agg_sum
+from tenk
+group by -two as two, -four as four
+
+union all
+select two, four, null as ten, null as twenty,
+ sum(unique1) as agg_sum
+from tenk
+group by tostring(two) as two, tostring(four) as four
+
+union all
+select two, null as four, null as ten, null as twenty,
+ sum(unique1) as agg_sum
+from tenk
+group by two
+
+union all
+select two, null as four, null as ten, null as twenty,
+ sum(unique1) as agg_sum
+from tenk
+group by -two as two
+
+union all
+select two, null as four, null as ten, null as twenty,
+ sum(unique1) as agg_sum
+from tenk
+group by tostring(two) as two
+
+order by two, four, ten, twenty;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.2.adm
new file mode 100644
index 0000000..5c84fb8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.2.adm
@@ -0,0 +1,15 @@
+{ "f1": "a", "f2": "b", "f3": "c", "f4": "d", "f5": "e" }
+{ "f1": "0", "f2": ", boo", "f3": " 1", "f4": "2", "f5": "3" }
+{ "f1": "1", "f2": "", "f3": "", "f4": "❤", "f5": "" }
+{ "f1": "2", "f2": "3", "f3": "4", "f4": "\\n", "f5": "" }
+{ "f1": "3", "f2": "quoted \"f\" field", "f3": "", "f4": "", "f5": "" }
+{ "f1": "4", "f2": "4", "f3": "", "f4": "", "f5": "" }
+{ "f1": "5", "f2": "{\"vehicle\": \"car\", \"location\": [2.0, 0.1]}", "f3": "", "f4": "", "f5": "" }
+{ "f1": "6", "f2": "2", "f3": "3", "f4": "", "f5": "" }
+{ "f1": "7", "f2": "8", "f3": "9", "f4": "", "f5": "" }
+{ "f1": "8", "f2": "2", "f3": "3", "f4": "", "f5": "" }
+{ "f1": "9", "f2": "8", "f3": "9", "f4": "", "f5": "" }
+{ "f1": "10", "f2": "field\n\"f\"\nwith multiple lines", "f3": "", "f4": "", "f5": "" }
+{ "f1": "11", "f2": "4", "f3": "", "f4": "", "f5": "" }
+{ "f1": "12", "f2": "5", "f3": "ʤ", "f4": "", "f5": "" }
+{ "f1": "John", "f2": "Green", "f3": "111 downtown st.", "f4": "city, state", "f5": "99999" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.3.adm
new file mode 100644
index 0000000..80f5fb7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.3.adm
@@ -0,0 +1,32 @@
+{ "f1": "1", "f2": "?/ Text ending with a backslash / \\", "f3": "2000-09-03 07:12:22" }
+{ "f1": "2", "f2": "non quoted text!yes......", "f3": "2003-08-09 22:34:19" }
+{ "f1": "3", "f2": "Text with more sentences. Another sentence.", "f3": "2003-09-12 05:29:12" }
+{ "f1": "4", "f2": "Quoted text.. yes.", "f3": "2003-09-13 17:21:49" }
+{ "f1": "5", "f2": "Another text", "f3": "2003-01-21 23:31:41" }
+{ "f1": "6", "f2": "Text with' quotes.", "f3": "2003-09-14 20:15:50" }
+{ "f1": "7", "f2": "Text with quote's", "f3": "2003-09-14 18:34:03" }
+{ "f1": "8", "f2": "Text with quotes '", "f3": "2003-01-28 20:32:13" }
+{ "f1": "9", "f2": "Text with quotes \"", "f3": "2003-01-18 11:44:15" }
+{ "f1": "10", "f2": "Text with question marks!?!?", "f3": "2003-09-18 06:25:56" }
+{ "f1": "11", "f2": "\" Text that starts with quotes", "f3": "2003-09-12 00:31:24" }
+{ "f1": "12", "f2": "Text with \\\" backslash and quotes", "f3": "2003-09-13 20:30:06" }
+{ "f1": "13", "f2": "Text with \\\" backslash and quotes\\\"", "f3": "2003-09-14 16:20:36" }
+{ "f1": "14", "f2": "Text that has comma ,", "f3": "2003-09-12 08:21:18" }
+{ "f1": "15", "f2": "Text that has \",\" quoted comma", "f3": "2003-09-12 08:21:18" }
+{ "f1": "16", "f2": ",Text that has ", "f3": "2003-09-12 08:21:18" }
+{ "f1": "17", "f2": ",\",Text that has ", "f3": "2003-09-12 08:21:18" }
+{ "f1": "18", "f2": "Text with commas,inside it., yes", "f3": "2003-09-13 23:42:14" }
+{ "f1": "19", "f2": "Text that has \\n inside ", "f3": "2003-09-12 08:21:18" }
+{ "f1": "20", "f2": "Text that has \\\\\\n inside ", "f3": "2003-09-12 08:21:18" }
+{ "f1": "21", "f2": "text with :)", "f3": "2003-09-05 19:15:34" }
+{ "f1": "22", "f2": "Text that has \\\\\\\" inside \\\\", "f3": "2003-09-12 08:21:18" }
+{ "f1": "23", "f2": "Text that has \\\\\\\" inside \\\\\"", "f3": "2003-09-12 08:21:18" }
+{ "f1": "24", "f2": "\"text that spans multiple\nLines and more\nLines ane more and more\nLines ...\nAnd yet more lines\nAnd more\"", "f3": "2011-09-19 01:09:09" }
+{ "f1": "25", "f2": "Text \"\nmore lines", "f3": "2011-09-19 01:09:09" }
+{ "f1": "26", "f2": "\"\n", "f3": "2011-09-19 01:09:09" }
+{ "f1": "27", "f2": "Text", "f3": "" }
+{ "f1": "28", "f2": "Text", "f3": "2011-09-19 01:09:09" }
+{ "f1": "29", "f2": "Text\\.", "f3": "2011-09-19 01:09:09" }
+{ "f1": "30", "f2": "Text\\.", "f3": "2011-09-19 01:09:09" }
+{ "f1": "31", "f2": "\\.Text", "f3": "2011-09-19 01:09:09" }
+{ "f1": "32", "f2": "\\.Text", "f3": "2011-09-19 01:09:09" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.4.adm
new file mode 100644
index 0000000..5c61b4a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.4.adm
@@ -0,0 +1,4 @@
+{ "f1": "1", "f2": ",\", b", "f3": " 3", "f4": "4", "f5": "5" }
+{ "f1": ",\", b", "f2": "4", "f3": " 3", "f4": "4", "f5": "5" }
+{ "f1": "", "f2": "", "f3": "", "f4": "", "f5": "" }
+{ "f1": "dd", "f2": "", "f3": "", "f4": "", "f5": "" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.5.adm
new file mode 100644
index 0000000..4b80e26
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.5.adm
@@ -0,0 +1,13 @@
+{ "f1": 1, "f2": true, "f3": "text" }
+{ "f1": 2, "f2": false, "f3": "text" }
+{ "f1": 3, "f2": true, "f3": "text" }
+{ "f1": 4, "f2": true, "f3": null }
+{ "f1": 5, "f2": false, "f3": null }
+{ "f1": 6, "f2": true, "f3": "text\"\nmore lines" }
+{ "f1": 7, "f2": false, "f3": "\"\n" }
+{ "f1": 8, "f2": true, "f3": null }
+{ "f1": 9, "f2": false, "f3": "text\"" }
+{ "f1": 10, "f2": false, "f3": "text\\." }
+{ "f1": 11, "f2": true, "f3": "text\\." }
+{ "f1": null, "f2": false, "f3": "\\.text" }
+{ "f1": 13, "f2": true, "f3": "\\.text" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.6.adm
new file mode 100644
index 0000000..9a1d1c9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.6.adm
@@ -0,0 +1,3 @@
+{ "f1": "1", "f2": "", "f3": "good", "f4": "recommend" }
+{ "f1": "2", "f2": "", "f3": "bad", "f4": "not recommend" }
+{ "f1": "3", "f2": "", "f3": "good", "f4": "" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.7.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.7.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/tsv-parser-001/tsv-parser-001.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/tsv-parser-001/tsv-parser-001.2.adm
new file mode 100644
index 0000000..fbe287b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/tsv-parser-001/tsv-parser-001.2.adm
@@ -0,0 +1,28 @@
+{ "f1": 11, "f2": 55, "f3": "text field wih , charrrrrrrrrrr", "f4": true, "f5": 90, "f6": 0.666666667 }
+{ "f1": 12, "f2": 55, "f3": "text field with \" charrrrrrrrrr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 14, "f2": 55, "f3": "text field with ' charrrrrrrrrr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 15, "f2": 55, "f3": "text field with \\ charrrrrrrrrr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 16, "f2": 55, "f3": "text field wih \\, char ", "f4": true, "f5": 90, "f6": 0.666666667 }
+{ "f1": 17, "f2": 55, "f3": "text field with \\\" charrrrrrrrr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 18, "f2": 55, "f3": "text field with \\' charrrrrrrrr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 19, "f2": 55, "f3": "text field with \\\\ charrrrrrrrr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 20, "f2": 55, "f3": "text field ending with charr ,", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 21, "f2": 55, "f3": "text field ending with charr \"", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 22, "f2": 55, "f3": "text field ending with charr '", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 23, "f2": 55, "f3": "text field ending with charr \\", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 24, "f2": 55, "f3": "text field ending with charr \\,", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 25, "f2": 55, "f3": "text field ending with charr \\\"", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 26, "f2": 55, "f3": "text field ending with charr \\'", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 27, "f2": 55, "f3": "text field ending with charr \\\\", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 28, "f2": 55, "f3": ",text field starting with charr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 29, "f2": 55, "f3": "\"text field starting with charr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 30, "f2": 55, "f3": "'text field starting with charr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 31, "f2": 55, "f3": "\\text field starting with charr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 32, "f2": 55, "f3": "\\,text field starting with char", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 33, "f2": 55, "f3": "\\\"text field starting with char", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 34, "f2": 55, "f3": "\\'text field starting with char", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 35, "f2": 55, "f3": "\\\\text field starting with char", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 36, "f2": 55, "f3": "\"text field inside with char\"", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 37, "f2": 55, "f3": " text field with charrrrrrrrr ", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 38, "f2": 55, "f3": "text field with \"\" charrrrrrrrr", "f4": false, "f5": 90, "f6": 0.666666667 }
+{ "f1": 39, "f2": 55, "f3": "text field \"with\" charrrrrrrrrr", "f4": false, "f5": 90, "f6": 0.666666667 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/001/external_dataset.001.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/001/external_dataset.001.adm
new file mode 100644
index 0000000..93d1b57
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/001/external_dataset.001.adm
@@ -0,0 +1,6 @@
+{ "id": 1, "year": null, "review": "good", "details": "recommend" }
+{ "id": 2, "year": null, "review": "bad", "details": "not recommend" }
+{ "id": 3, "year": null, "review": "good", "details": null }
+{ "id": 4, "year": 2018, "review": "good", "details": "recommend" }
+{ "id": 5, "year": 2018, "review": "", "details": "not recommend" }
+{ "id": 6, "year": 2018, "review": "good", "details": null }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/002/external_dataset.001.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/002/external_dataset.001.adm
new file mode 100644
index 0000000..1954b05
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/002/external_dataset.001.adm
@@ -0,0 +1,6 @@
+{ "id": 1, "year": null, "review": "\"good\"", "details": "\"recommend\"" }
+{ "id": 2, "year": null, "review": "\"bad\"", "details": "\"not recommend\"" }
+{ "id": 3, "year": null, "review": "\"good\"", "details": "\"recommend\"" }
+{ "id": 4, "year": 2018, "review": "\"good\"", "details": "\"recommend\"" }
+{ "id": 5, "year": 2018, "review": "", "details": "\"not recommend\"" }
+{ "id": 6, "year": 2018, "review": "\"good\"", "details": "\"recommend\"" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/list/get-item_03/get-item_03.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/list/get-item_03/get-item_03.3.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/list/get-item_03/get-item_03.1.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/list/get-item_03/get-item_03.3.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/list/get-item_03/get-item_03.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/list/get-item_03/get-item_03.4.adm
new file mode 100644
index 0000000..27ec71f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/list/get-item_03/get-item_03.4.adm
@@ -0,0 +1 @@
+{ "ta1": 3, "ta2": 2, "ta3": 1, "ta4": true, "tb1": 3, "tb2": 2, "tb3": 1, "tb4": true }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_nested/union_nested.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_nested/union_nested.3.adm
new file mode 100644
index 0000000..4737f9e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/union/union_nested/union_nested.3.adm
@@ -0,0 +1,138 @@
+{ "agg_sum": 25000000, "two": -1, "four": null, "ten": null, "twenty": null }
+{ "agg_sum": 12502500, "two": -1, "four": -3, "ten": null, "twenty": null }
+{ "agg_sum": 2504500, "two": -1, "four": -3, "ten": -9, "twenty": null }
+{ "agg_sum": 2504500, "two": -1, "four": -3, "ten": -9, "twenty": -19 }
+{ "agg_sum": 2498500, "two": -1, "four": -3, "ten": -7, "twenty": null }
+{ "agg_sum": 2498500, "two": -1, "four": -3, "ten": -7, "twenty": -7 }
+{ "agg_sum": 2502500, "two": -1, "four": -3, "ten": -5, "twenty": null }
+{ "agg_sum": 2502500, "two": -1, "four": -3, "ten": -5, "twenty": -15 }
+{ "agg_sum": 2496500, "two": -1, "four": -3, "ten": -3, "twenty": null }
+{ "agg_sum": 2496500, "two": -1, "four": -3, "ten": -3, "twenty": -3 }
+{ "agg_sum": 2500500, "two": -1, "four": -3, "ten": -1, "twenty": null }
+{ "agg_sum": 2500500, "two": -1, "four": -3, "ten": -1, "twenty": -11 }
+{ "agg_sum": 12497500, "two": -1, "four": -1, "ten": null, "twenty": null }
+{ "agg_sum": 2499500, "two": -1, "four": -1, "ten": -9, "twenty": null }
+{ "agg_sum": 2499500, "two": -1, "four": -1, "ten": -9, "twenty": -9 }
+{ "agg_sum": 2503500, "two": -1, "four": -1, "ten": -7, "twenty": null }
+{ "agg_sum": 2503500, "two": -1, "four": -1, "ten": -7, "twenty": -17 }
+{ "agg_sum": 2497500, "two": -1, "four": -1, "ten": -5, "twenty": null }
+{ "agg_sum": 2497500, "two": -1, "four": -1, "ten": -5, "twenty": -5 }
+{ "agg_sum": 2501500, "two": -1, "four": -1, "ten": -3, "twenty": null }
+{ "agg_sum": 2501500, "two": -1, "four": -1, "ten": -3, "twenty": -13 }
+{ "agg_sum": 2495500, "two": -1, "four": -1, "ten": -1, "twenty": null }
+{ "agg_sum": 2495500, "two": -1, "four": -1, "ten": -1, "twenty": -1 }
+{ "agg_sum": 24995000, "two": 0, "four": null, "ten": null, "twenty": null }
+{ "agg_sum": 24995000, "two": 0, "four": null, "ten": null, "twenty": null }
+{ "agg_sum": 12500000, "two": 0, "four": -2, "ten": null, "twenty": null }
+{ "agg_sum": 2504000, "two": 0, "four": -2, "ten": -8, "twenty": null }
+{ "agg_sum": 2504000, "two": 0, "four": -2, "ten": -8, "twenty": -18 }
+{ "agg_sum": 2498000, "two": 0, "four": -2, "ten": -6, "twenty": null }
+{ "agg_sum": 2498000, "two": 0, "four": -2, "ten": -6, "twenty": -6 }
+{ "agg_sum": 2502000, "two": 0, "four": -2, "ten": -4, "twenty": null }
+{ "agg_sum": 2502000, "two": 0, "four": -2, "ten": -4, "twenty": -14 }
+{ "agg_sum": 2496000, "two": 0, "four": -2, "ten": -2, "twenty": null }
+{ "agg_sum": 2496000, "two": 0, "four": -2, "ten": -2, "twenty": -2 }
+{ "agg_sum": 2500000, "two": 0, "four": -2, "ten": 0, "twenty": null }
+{ "agg_sum": 2500000, "two": 0, "four": -2, "ten": 0, "twenty": -10 }
+{ "agg_sum": 12495000, "two": 0, "four": 0, "ten": null, "twenty": null }
+{ "agg_sum": 12495000, "two": 0, "four": 0, "ten": null, "twenty": null }
+{ "agg_sum": 2499000, "two": 0, "four": 0, "ten": -8, "twenty": null }
+{ "agg_sum": 2499000, "two": 0, "four": 0, "ten": -8, "twenty": -8 }
+{ "agg_sum": 2503000, "two": 0, "four": 0, "ten": -6, "twenty": null }
+{ "agg_sum": 2503000, "two": 0, "four": 0, "ten": -6, "twenty": -16 }
+{ "agg_sum": 2497000, "two": 0, "four": 0, "ten": -4, "twenty": null }
+{ "agg_sum": 2497000, "two": 0, "four": 0, "ten": -4, "twenty": -4 }
+{ "agg_sum": 2501000, "two": 0, "four": 0, "ten": -2, "twenty": null }
+{ "agg_sum": 2501000, "two": 0, "four": 0, "ten": -2, "twenty": -12 }
+{ "agg_sum": 2495000, "two": 0, "four": 0, "ten": 0, "twenty": null }
+{ "agg_sum": 2495000, "two": 0, "four": 0, "ten": 0, "twenty": null }
+{ "agg_sum": 2495000, "two": 0, "four": 0, "ten": 0, "twenty": 0 }
+{ "agg_sum": 2495000, "two": 0, "four": 0, "ten": 0, "twenty": 0 }
+{ "agg_sum": 2501000, "two": 0, "four": 0, "ten": 2, "twenty": null }
+{ "agg_sum": 2501000, "two": 0, "four": 0, "ten": 2, "twenty": 12 }
+{ "agg_sum": 2497000, "two": 0, "four": 0, "ten": 4, "twenty": null }
+{ "agg_sum": 2497000, "two": 0, "four": 0, "ten": 4, "twenty": 4 }
+{ "agg_sum": 2503000, "two": 0, "four": 0, "ten": 6, "twenty": null }
+{ "agg_sum": 2503000, "two": 0, "four": 0, "ten": 6, "twenty": 16 }
+{ "agg_sum": 2499000, "two": 0, "four": 0, "ten": 8, "twenty": null }
+{ "agg_sum": 2499000, "two": 0, "four": 0, "ten": 8, "twenty": 8 }
+{ "agg_sum": 12500000, "two": 0, "four": 2, "ten": null, "twenty": null }
+{ "agg_sum": 2500000, "two": 0, "four": 2, "ten": 0, "twenty": null }
+{ "agg_sum": 2500000, "two": 0, "four": 2, "ten": 0, "twenty": 10 }
+{ "agg_sum": 2496000, "two": 0, "four": 2, "ten": 2, "twenty": null }
+{ "agg_sum": 2496000, "two": 0, "four": 2, "ten": 2, "twenty": 2 }
+{ "agg_sum": 2502000, "two": 0, "four": 2, "ten": 4, "twenty": null }
+{ "agg_sum": 2502000, "two": 0, "four": 2, "ten": 4, "twenty": 14 }
+{ "agg_sum": 2498000, "two": 0, "four": 2, "ten": 6, "twenty": null }
+{ "agg_sum": 2498000, "two": 0, "four": 2, "ten": 6, "twenty": 6 }
+{ "agg_sum": 2504000, "two": 0, "four": 2, "ten": 8, "twenty": null }
+{ "agg_sum": 2504000, "two": 0, "four": 2, "ten": 8, "twenty": 18 }
+{ "agg_sum": 25000000, "two": 1, "four": null, "ten": null, "twenty": null }
+{ "agg_sum": 12497500, "two": 1, "four": 1, "ten": null, "twenty": null }
+{ "agg_sum": 2495500, "two": 1, "four": 1, "ten": 1, "twenty": null }
+{ "agg_sum": 2495500, "two": 1, "four": 1, "ten": 1, "twenty": 1 }
+{ "agg_sum": 2501500, "two": 1, "four": 1, "ten": 3, "twenty": null }
+{ "agg_sum": 2501500, "two": 1, "four": 1, "ten": 3, "twenty": 13 }
+{ "agg_sum": 2497500, "two": 1, "four": 1, "ten": 5, "twenty": null }
+{ "agg_sum": 2497500, "two": 1, "four": 1, "ten": 5, "twenty": 5 }
+{ "agg_sum": 2503500, "two": 1, "four": 1, "ten": 7, "twenty": null }
+{ "agg_sum": 2503500, "two": 1, "four": 1, "ten": 7, "twenty": 17 }
+{ "agg_sum": 2499500, "two": 1, "four": 1, "ten": 9, "twenty": null }
+{ "agg_sum": 2499500, "two": 1, "four": 1, "ten": 9, "twenty": 9 }
+{ "agg_sum": 12502500, "two": 1, "four": 3, "ten": null, "twenty": null }
+{ "agg_sum": 2500500, "two": 1, "four": 3, "ten": 1, "twenty": null }
+{ "agg_sum": 2500500, "two": 1, "four": 3, "ten": 1, "twenty": 11 }
+{ "agg_sum": 2496500, "two": 1, "four": 3, "ten": 3, "twenty": null }
+{ "agg_sum": 2496500, "two": 1, "four": 3, "ten": 3, "twenty": 3 }
+{ "agg_sum": 2502500, "two": 1, "four": 3, "ten": 5, "twenty": null }
+{ "agg_sum": 2502500, "two": 1, "four": 3, "ten": 5, "twenty": 15 }
+{ "agg_sum": 2498500, "two": 1, "four": 3, "ten": 7, "twenty": null }
+{ "agg_sum": 2498500, "two": 1, "four": 3, "ten": 7, "twenty": 7 }
+{ "agg_sum": 2504500, "two": 1, "four": 3, "ten": 9, "twenty": null }
+{ "agg_sum": 2504500, "two": 1, "four": 3, "ten": 9, "twenty": 19 }
+{ "agg_sum": 24995000, "two": "0", "four": null, "ten": null, "twenty": null }
+{ "agg_sum": 12495000, "two": "0", "four": "0", "ten": null, "twenty": null }
+{ "agg_sum": 2495000, "two": "0", "four": "0", "ten": "0", "twenty": null }
+{ "agg_sum": 2495000, "two": "0", "four": "0", "ten": "0", "twenty": "0" }
+{ "agg_sum": 2501000, "two": "0", "four": "0", "ten": "2", "twenty": null }
+{ "agg_sum": 2501000, "two": "0", "four": "0", "ten": "2", "twenty": "12" }
+{ "agg_sum": 2497000, "two": "0", "four": "0", "ten": "4", "twenty": null }
+{ "agg_sum": 2497000, "two": "0", "four": "0", "ten": "4", "twenty": "4" }
+{ "agg_sum": 2503000, "two": "0", "four": "0", "ten": "6", "twenty": null }
+{ "agg_sum": 2503000, "two": "0", "four": "0", "ten": "6", "twenty": "16" }
+{ "agg_sum": 2499000, "two": "0", "four": "0", "ten": "8", "twenty": null }
+{ "agg_sum": 2499000, "two": "0", "four": "0", "ten": "8", "twenty": "8" }
+{ "agg_sum": 12500000, "two": "0", "four": "2", "ten": null, "twenty": null }
+{ "agg_sum": 2500000, "two": "0", "four": "2", "ten": "0", "twenty": null }
+{ "agg_sum": 2500000, "two": "0", "four": "2", "ten": "0", "twenty": "10" }
+{ "agg_sum": 2496000, "two": "0", "four": "2", "ten": "2", "twenty": null }
+{ "agg_sum": 2496000, "two": "0", "four": "2", "ten": "2", "twenty": "2" }
+{ "agg_sum": 2502000, "two": "0", "four": "2", "ten": "4", "twenty": null }
+{ "agg_sum": 2502000, "two": "0", "four": "2", "ten": "4", "twenty": "14" }
+{ "agg_sum": 2498000, "two": "0", "four": "2", "ten": "6", "twenty": null }
+{ "agg_sum": 2498000, "two": "0", "four": "2", "ten": "6", "twenty": "6" }
+{ "agg_sum": 2504000, "two": "0", "four": "2", "ten": "8", "twenty": null }
+{ "agg_sum": 2504000, "two": "0", "four": "2", "ten": "8", "twenty": "18" }
+{ "agg_sum": 25000000, "two": "1", "four": null, "ten": null, "twenty": null }
+{ "agg_sum": 12497500, "two": "1", "four": "1", "ten": null, "twenty": null }
+{ "agg_sum": 2495500, "two": "1", "four": "1", "ten": "1", "twenty": null }
+{ "agg_sum": 2495500, "two": "1", "four": "1", "ten": "1", "twenty": "1" }
+{ "agg_sum": 2501500, "two": "1", "four": "1", "ten": "3", "twenty": null }
+{ "agg_sum": 2501500, "two": "1", "four": "1", "ten": "3", "twenty": "13" }
+{ "agg_sum": 2497500, "two": "1", "four": "1", "ten": "5", "twenty": null }
+{ "agg_sum": 2497500, "two": "1", "four": "1", "ten": "5", "twenty": "5" }
+{ "agg_sum": 2503500, "two": "1", "four": "1", "ten": "7", "twenty": null }
+{ "agg_sum": 2503500, "two": "1", "four": "1", "ten": "7", "twenty": "17" }
+{ "agg_sum": 2499500, "two": "1", "four": "1", "ten": "9", "twenty": null }
+{ "agg_sum": 2499500, "two": "1", "four": "1", "ten": "9", "twenty": "9" }
+{ "agg_sum": 12502500, "two": "1", "four": "3", "ten": null, "twenty": null }
+{ "agg_sum": 2500500, "two": "1", "four": "3", "ten": "1", "twenty": null }
+{ "agg_sum": 2500500, "two": "1", "four": "3", "ten": "1", "twenty": "11" }
+{ "agg_sum": 2496500, "two": "1", "four": "3", "ten": "3", "twenty": null }
+{ "agg_sum": 2496500, "two": "1", "four": "3", "ten": "3", "twenty": "3" }
+{ "agg_sum": 2502500, "two": "1", "four": "3", "ten": "5", "twenty": null }
+{ "agg_sum": 2502500, "two": "1", "four": "3", "ten": "5", "twenty": "15" }
+{ "agg_sum": 2498500, "two": "1", "four": "3", "ten": "7", "twenty": null }
+{ "agg_sum": 2498500, "two": "1", "four": "3", "ten": "7", "twenty": "7" }
+{ "agg_sum": 2504500, "two": "1", "four": "3", "ten": "9", "twenty": null }
+{ "agg_sum": 2504500, "two": "1", "four": "3", "ten": "9", "twenty": "19" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/numeric/add_double/add_double.1.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/numeric/add_double/add_double.1.ast
index 4550cfe..7e1443c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/numeric/add_double/add_double.1.ast
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/numeric/add_double/add_double.1.ast
@@ -97,12 +97,7 @@
LiteralExpr [STRING] [-6.5d]
]
+
- IndexAccessor [
- OrderedListConstructor [
- LiteralExpr [DOUBLE] [1.0]
- ]
- Index: - LiteralExpr [LONG] [1]
- ]
+ LiteralExpr [MISSING]
]
)
]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
index cd1fb12..2456f13 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
@@ -17,12 +17,28 @@
! specific language governing permissions and limitations
! under the License.
!-->
-<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp">
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
<test-group name="external-dataset">
<test-case FilePath="external-dataset">
<compilation-unit name="aws/s3/000">
<output-dir compare="Text">aws/s3/000</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="aws/s3/001">
+ <output-dir compare="Text">aws/s3/001</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="aws/s3/002">
+ <output-dir compare="Text">aws/s3/002</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="aws/s3/negative">
+ <output-dir compare="Text">aws/s3/negative</output-dir>
+ <expected-error>Parameter(s) format must be specified</expected-error>
+ </compilation-unit>
+ </test-case>
</test-group>
</test-suite>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index ec6c24f..bcaa397 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -13141,6 +13141,18 @@
</compilation-unit>
</test-case>
</test-group>
+ <test-group name="csv-tsv-parser">
+ <test-case FilePath="csv-tsv-parser">
+ <compilation-unit name="csv-parser-001">
+ <output-dir compare="Text">csv-parser-001</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="csv-tsv-parser">
+ <compilation-unit name="tsv-parser-001">
+ <output-dir compare="Text">tsv-parser-001</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
<test-group name="binary">
<test-case FilePath="binary">
<compilation-unit name="parse">
@@ -13234,6 +13246,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="union">
+ <compilation-unit name="union_nested">
+ <output-dir compare="Text">union_nested</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="union">
<compilation-unit name="union_orderby">
<output-dir compare="Text">union_orderby</output-dir>
</compilation-unit>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 7fef69e..4e10498 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -248,7 +248,7 @@
public static final int LIBRARY_EXTERNAL_FUNCTION_UNSUPPORTED_NAME = 3047;
public static final int OPERATORS_FEED_META_OPERATOR_DESCRIPTOR_INVALID_RUNTIME = 3048;
public static final int PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_DELIMITER = 3049;
- public static final int PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_QUOTE = 3050;
+ public static final int PARSER_INVALID_CHAR_LENGTH = 3050;
public static final int PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_QUOTE_DELIMITER_MISMATCH = 3051;
public static final int INDEXING_EXTERNAL_FILE_INDEX_ACCESSOR_UNABLE_TO_FIND_FILE_INDEX = 3052;
public static final int PARSER_ADM_DATA_PARSER_FIELD_NOT_NULL = 3053;
@@ -314,7 +314,9 @@
public static final int FAILED_TO_PARSE_METADATA = 3115;
public static final int INPUT_DECODE_FAILURE = 3116;
public static final int FAILED_TO_PARSE_MALFORMED_LOG_RECORD = 3117;
- public static final int METADATA_DROP_LIBRARY_IN_USE = 3118;
+ public static final int PARAMETERS_REQUIRED = 3118;
+ public static final int MALFORMED_RECORD = 3119;
+ public static final int METADATA_DROP_LIBRARY_IN_USE = 3120;
// Lifecycle management errors
public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 4461ff1..48d849f 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -246,7 +246,7 @@
3047 = External %1$s not supported
3048 = Invalid feed runtime: %1$s
3049 = '%1$s' is not a valid delimiter. The length of a delimiter should be 1
-3050 = '%1$s' is not a valid quote. The length of a quote should be 1
+3050 = '%1$s' is not a valid %2$s. The length of %2$s should be 1
3051 = Quote '%1$s' cannot be used with the delimiter '%2$s'
3052 = Was not able to find a file in the files index
3053 = Field %1$s can not be null
@@ -312,7 +312,9 @@
3115 = Failed to parse record metadata
3116 = Failed to decode input
3117 = Failed to parse record, malformed log record
-3118 = Library %1$s is being used. It cannot be dropped
+3118 = Parameter(s) %1$s must be specified
+3119 = Record number %1$s is malformed
+3120 = Library %1$s is being used. It cannot be dropped
# Lifecycle management errors
4000 = Partition id %1$s for node %2$s already in use by node %3$s
diff --git a/asterixdb/asterix-doc/src/main/markdown/sqlpp/2_expr.md b/asterixdb/asterix-doc/src/main/markdown/sqlpp/2_expr.md
index 37e7f79..2e0b526 100644
--- a/asterixdb/asterix-doc/src/main/markdown/sqlpp/2_expr.md
+++ b/asterixdb/asterix-doc/src/main/markdown/sqlpp/2_expr.md
@@ -218,8 +218,9 @@
single element from an array, or a whole subset of an array. Accessing a single element is achieved by
providing a single index argument (zero-based element position), while obtaining a subset of an array is achieved by
providing the `start` and `end` (zero-based) index positions; the returned subset is from position `start` to position
-`end - 1`; the `end` position argument is optional. Multisets have similar behavior to arrays, except for retrieving
-arbitrary items as the order of items is not fixed in multisets.
+`end - 1`; the `end` position argument is optional. If a position argument is negative then the element position is
+counted from the end of the array (`-1` addresses the last element, `-2` next to last, and so on). Multisets have
+similar behavior to arrays, except for retrieving arbitrary items as the order of items is not fixed in multisets.
Attempts to access non-existent fields or out-of-bound array elements produce the special value `MISSING`. Type errors
will be raised for inappropriate use of a path expression, such as applying a field accessor to a numeric value.
@@ -232,12 +233,16 @@
({"name": "MyABCs", "array": [ "a", "b", "c"]}).array
(["a", "b", "c"])[2]
+
+ (["a", "b", "c"])[-1]
({"name": "MyABCs", "array": [ "a", "b", "c"]}).array[2]
(["a", "b", "c"])[0:2]
(["a", "b", "c"])[0:]
+
+ (["a", "b", "c"])[-2:-1]
## <a id="Primary_expressions">Primary Expressions</a>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index f830376..7702dde 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -205,7 +205,7 @@
IExternalIndexer indexer = files == null ? null : ExternalIndexerProvider.getIndexer(configuration);
if (recordReaderClazz != null) {
StreamRecordReader streamReader = (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
- streamReader.configure(createInputStream(ctx, partition, indexer), configuration);
+ streamReader.configure(ctx, createInputStream(ctx, partition, indexer), configuration);
if (indexer != null) {
return new IndexingStreamRecordReader(streamReader, indexer);
} else {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
index 65ecd8d..aa4abb4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
@@ -134,4 +134,10 @@
strValue.getChars(0, strValue.length(), value, 0);
this.size = strValue.length();
}
+
+ public boolean isEmptyRecord() {
+ return size <= 0
+ || (size == 1 && (value[0] == ExternalDataConstants.LF || value[0] == ExternalDataConstants.CR))
+ || (size == 2 && value[0] == ExternalDataConstants.CR && value[1] == ExternalDataConstants.LF);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
index 8255ebb..5c8f219 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
@@ -41,7 +41,7 @@
this.cursor = new FieldCursorForDelimitedDataParser(null, delimiter, ExternalDataConstants.QUOTE);
this.record = new CharArrayRecord();
this.valueIndex = valueIndex;
- this.recordWithMetadata = new RecordWithMetadataAndPK<char[]>(record, metaType.getFieldTypes(), recordType,
+ this.recordWithMetadata = new RecordWithMetadataAndPK<>(record, metaType.getFieldTypes(), recordType,
keyIndicator, keyIndexes, keyTypes);
}
@@ -53,16 +53,15 @@
int i = 0;
int j = 0;
while (cursor.nextField()) {
- if (cursor.isDoubleQuoteIncludedInThisField) {
- cursor.eliminateDoubleQuote(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
- cursor.fEnd -= cursor.doubleQuoteCount;
- cursor.isDoubleQuoteIncludedInThisField = false;
+ if (cursor.fieldHasDoubleQuote()) {
+ cursor.eliminateDoubleQuote();
}
if (i == valueIndex) {
- record.setValue(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
+ record.setValue(cursor.getBuffer(), cursor.getFieldStart(), cursor.getFieldLength());
record.endRecord();
} else {
- recordWithMetadata.setRawMetadata(j, cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
+ recordWithMetadata.setRawMetadata(j, cursor.getBuffer(), cursor.getFieldStart(),
+ cursor.getFieldLength());
j++;
}
i++;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index a9f7898..6b8bb59 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -27,6 +27,7 @@
import java.util.Map;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
@@ -35,7 +36,6 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
@@ -77,8 +77,7 @@
}
@Override
- public void configure(IServiceContext ctx, Map<String, String> configuration)
- throws AlgebricksException, HyracksDataException {
+ public void configure(IServiceContext ctx, Map<String, String> configuration) throws AlgebricksException {
this.configuration = configuration;
ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext();
@@ -115,13 +114,13 @@
*
* @return A list of string paths that point to files only
*
- * @throws HyracksDataException HyracksDataException
+ * @throws AsterixException AsterixException
*/
- private List<S3Object> getFilesOnly(List<S3Object> s3Objects, String fileFormat) throws HyracksDataException {
+ private List<S3Object> getFilesOnly(List<S3Object> s3Objects, String fileFormat) throws AsterixException {
List<S3Object> filesOnly = new ArrayList<>();
String fileExtension = getFileExtension(fileFormat);
if (fileExtension == null) {
- throw HyracksDataException.create(ErrorCode.INVALID_FORMAT);
+ throw AsterixException.create(ErrorCode.PROVIDER_STREAM_RECORD_READER_UNKNOWN_FORMAT, fileFormat);
}
s3Objects.stream().filter(object -> object.key().endsWith(fileExtension)).forEach(filesOnly::add);
@@ -214,8 +213,12 @@
*/
private String getFileExtension(String format) {
switch (format.toLowerCase()) {
- case "json":
+ case ExternalDataConstants.FORMAT_JSON_LOWER_CASE:
return ".json";
+ case ExternalDataConstants.FORMAT_CSV:
+ return ".csv";
+ case ExternalDataConstants.FORMAT_TSV:
+ return ".tsv";
default:
return null;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
index e78783a..6484d4e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
@@ -18,20 +18,16 @@
*/
package org.apache.asterix.external.input.record.reader.aws;
-import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
import org.apache.asterix.external.provider.StreamRecordReaderProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class AwsS3ReaderFactory extends StreamRecordReaderFactory {
@@ -73,18 +69,4 @@
// record reader
recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
}
-
- @Override
- public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
- throws HyracksDataException {
- try {
- StreamRecordReader streamRecordReader =
- (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
- streamRecordReader.configure(streamFactory.createInputStream(ctx, partition), configuration);
- return streamRecordReader;
- } catch (InstantiationException | IllegalAccessException | InvocationTargetException
- | NoSuchMethodException e) {
- throw HyracksDataException.create(e);
- }
- }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
index 07b6250..24a68a7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
@@ -26,6 +26,7 @@
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
public class EmptyLineSeparatedRecordReader extends StreamRecordReader {
@@ -135,7 +136,7 @@
}
@Override
- public void configure(AsterixInputStream inputStream, Map<String, String> config) {
+ public void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config) {
super.configure(inputStream);
this.config = config;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index 0b41d4b..a27397e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -27,6 +27,7 @@
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class LineRecordReader extends StreamRecordReader {
@@ -36,15 +37,18 @@
protected int newlineLength;
protected int recordNumber = 0;
protected boolean nextIsHeader = false;
- private static final List<String> recordReaderFormats = Collections.unmodifiableList(
- Arrays.asList(ExternalDataConstants.FORMAT_DELIMITED_TEXT, ExternalDataConstants.FORMAT_CSV));
+ private static final List<String> recordReaderFormats =
+ Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_DELIMITED_TEXT,
+ ExternalDataConstants.FORMAT_CSV, ExternalDataConstants.FORMAT_TSV));
private static final String REQUIRED_CONFIGS = "";
@Override
- public void configure(AsterixInputStream inputStream, Map<String, String> config) throws HyracksDataException {
+ public void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config)
+ throws HyracksDataException {
super.configure(inputStream);
this.hasHeader = ExternalDataUtils.hasHeader(config);
if (hasHeader) {
+ // TODO(ali): revisit this and notifyNewSource
inputStream.setNotificationHandler(this);
}
}
@@ -99,13 +103,16 @@
startPosn = bufferPosn = 0;
bufferLength = reader.read(inputBuffer);
if (bufferLength <= 0) {
- if (readLength > 0) {
- record.endRecord();
- recordNumber++;
- return true;
+ if (readLength <= 0) {
+ close();
+ return false; //EOF
}
- close();
- return false; //EOF
+ record.endRecord();
+ if (record.isEmptyRecord()) {
+ return false;
+ }
+ recordNumber++;
+ return true;
}
}
for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
@@ -129,6 +136,9 @@
record.append(inputBuffer, startPosn, readLength);
}
} while (newlineLength == 0);
+ if (record.isEmptyRecord()) {
+ continue;
+ }
if (nextIsHeader) {
nextIsHeader = false;
continue;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index 4c4128a..564df4b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -24,29 +24,35 @@
import java.util.List;
import java.util.Map;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.WarningUtil;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
public class QuotedLineRecordReader extends LineRecordReader {
private char quote;
- private boolean prevCharEscape;
- private boolean inQuote;
+ private char quoteEscape;
+ private IWarningCollector warningCollector;
+ private final SourceLocation srcLoc = new SourceLocation(-1, -1);
private static final List<String> recordReaderFormats = Collections.unmodifiableList(
Arrays.asList(ExternalDataConstants.FORMAT_DELIMITED_TEXT, ExternalDataConstants.FORMAT_CSV));
- private static final String REQUIRED_CONFIGS = "quote";
+ private static final String REQUIRED_CONFIGS = ExternalDataConstants.KEY_QUOTE;
@Override
- public void configure(AsterixInputStream inputStream, Map<String, String> config) throws HyracksDataException {
- super.configure(inputStream, config);
+ public void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config)
+ throws HyracksDataException {
+ super.configure(ctx, inputStream, config);
+ this.warningCollector = ctx.getWarningCollector();
String quoteString = config.get(ExternalDataConstants.KEY_QUOTE);
- if (quoteString.length() != 1) {
- throw new HyracksDataException(ExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_QUOTE,
- ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
- }
+ ExternalDataUtils.validateQuote(quoteString);
this.quote = quoteString.charAt(0);
+ this.quoteEscape = ExternalDataUtils.validateGetQuoteEscape(config);
}
@Override
@@ -67,31 +73,41 @@
}
newlineLength = 0;
prevCharCR = false;
- prevCharEscape = false;
+ boolean prevCharEscape = false;
record.reset();
int readLength = 0;
- inQuote = false;
+ boolean inQuote = false;
do {
int startPosn = bufferPosn;
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
bufferLength = reader.read(inputBuffer);
if (bufferLength <= 0) {
- {
- if (readLength > 0) {
- if (inQuote) {
- throw new IOException("malformed input record ended inside quote");
- }
- record.endRecord();
- recordNumber++;
- return true;
+ // reached end of stream
+ if (readLength <= 0 || inQuote) {
+ // haven't read anything previously OR have read and in the middle and hit the end
+ if (inQuote && warningCollector.shouldWarn()) {
+ warningCollector
+ .warn(WarningUtil.forAsterix(srcLoc, ErrorCode.MALFORMED_RECORD, recordNumber));
}
close();
return false;
}
+ record.endRecord();
+ if (record.isEmptyRecord()) {
+ return false;
+ }
+ recordNumber++;
+ return true;
}
}
+ boolean maybeInQuote = false;
for (; bufferPosn < bufferLength; ++bufferPosn) {
+ if (inputBuffer[bufferPosn] == quote && quoteEscape == quote) {
+ inQuote |= maybeInQuote;
+ prevCharEscape |= maybeInQuote;
+ }
+ maybeInQuote = false;
if (!inQuote) {
if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
newlineLength = (prevCharCR) ? 2 : 1;
@@ -103,24 +119,22 @@
break;
}
prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
- if (inputBuffer[bufferPosn] == quote) {
- if (!prevCharEscape) {
- inQuote = true;
- }
+ if (inputBuffer[bufferPosn] == quote && !prevCharEscape) {
+ // this is an opening quote
+ inQuote = true;
}
- if (prevCharEscape) {
- prevCharEscape = false;
- } else {
- prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
- }
+ // the quoteEscape != quote is for making an opening quote not an escape
+ prevCharEscape =
+ inputBuffer[bufferPosn] == quoteEscape && !prevCharEscape && quoteEscape != quote;
} else {
- // only look for next quote
- if (inputBuffer[bufferPosn] == quote) {
- if (!prevCharEscape) {
- inQuote = false;
- }
+ // if quote == quoteEscape and current char is quote, then it could be closing or escaping
+ if (inputBuffer[bufferPosn] == quote && !prevCharEscape) {
+ // this is most likely a closing quote. the outcome depends on the next char
+ inQuote = false;
+ maybeInQuote = true;
}
- prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+ prevCharEscape =
+ inputBuffer[bufferPosn] == quoteEscape && !prevCharEscape && quoteEscape != quote;
}
}
readLength = bufferPosn - startPosn;
@@ -131,6 +145,9 @@
record.append(inputBuffer, startPosn, readLength);
}
} while (newlineLength == 0);
+ if (record.isEmptyRecord()) {
+ continue;
+ }
if (nextIsHeader) {
nextIsHeader = false;
continue;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 883f0ee..5ab5730 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -29,6 +29,7 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class SemiStructuredRecordReader extends StreamRecordReader {
@@ -45,7 +46,8 @@
private static final String REQUIRED_CONFIGS = "";
@Override
- public void configure(AsterixInputStream stream, Map<String, String> config) throws HyracksDataException {
+ public void configure(IHyracksTaskContext ctx, AsterixInputStream stream, Map<String, String> config)
+ throws HyracksDataException {
super.configure(stream);
String recStartString = config.get(ExternalDataConstants.KEY_RECORD_START);
String recEndString = config.get(ExternalDataConstants.KEY_RECORD_END);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
index 5629f48..4aed741 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
@@ -31,6 +31,7 @@
import org.apache.asterix.external.input.stream.AsterixInputStreamReader;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public abstract class StreamRecordReader implements IRecordReader<char[]>, IStreamNotificationHandler {
@@ -100,6 +101,6 @@
public abstract String getRequiredConfigs();
- public abstract void configure(AsterixInputStream inputStream, Map<String, String> config)
+ public abstract void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config)
throws HyracksDataException;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index 076842e..9bb50f6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -95,7 +95,7 @@
try {
StreamRecordReader streamRecordReader =
(StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
- streamRecordReader.configure(streamFactory.createInputStream(ctx, partition), configuration);
+ streamRecordReader.configure(ctx, streamFactory.createInputStream(ctx, partition), configuration);
return streamRecordReader;
} catch (InstantiationException | IllegalAccessException | InvocationTargetException
| NoSuchMethodException e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
index 8e166c0..d35ad26 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
@@ -68,7 +68,7 @@
}
@Override
- public int read(char cbuf[], int offset, int length) throws IOException {
+ public int read(char[] cbuf, int offset, int length) throws IOException {
if (done) {
return -1;
}
@@ -84,7 +84,9 @@
} else {
// need to read more data
System.arraycopy(bytes, byteBuffer.position(), bytes, 0, byteBuffer.remaining());
+ len = 0; // reset to read more bytes
byteBuffer.position(byteBuffer.remaining());
+ byteBuffer.limit(byteBuffer.capacity()); //set limit to capacity for the new bytes
while (len == 0) {
len = in.read(bytes, byteBuffer.position(), bytes.length - byteBuffer.position());
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
index fb9a4a6..0fe81cd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
@@ -137,6 +137,11 @@
@Override
public boolean handleException(Throwable th) {
+ if (th instanceof RuntimeException) {
+ //Log runtime exception
+ //TODO Should we continue anyway?
+ LOGGER.error("Encountered an unexpected error", th);
+ }
try {
return accept();
} catch (IOException e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
index 4e371c8..505acbd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -35,6 +35,7 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
@@ -43,23 +44,24 @@
public class DelimitedDataParser extends AbstractDataParser implements IStreamDataParser, IRecordDataParser<char[]> {
+ private final IHyracksTaskContext ctx;
private final char fieldDelimiter;
private final char quote;
private final boolean hasHeader;
- private ARecordType recordType;
- private IARecordBuilder recBuilder;
- private ArrayBackedValueStorage fieldValueBuffer;
- private DataOutput fieldValueBufferOutput;
- private IValueParser[] valueParsers;
+ private final ARecordType recordType;
+ private final IARecordBuilder recBuilder;
+ private final ArrayBackedValueStorage fieldValueBuffer;
+ private final DataOutput fieldValueBufferOutput;
+ private final IValueParser[] valueParsers;
private FieldCursorForDelimitedDataParser cursor;
- private byte[] fieldTypeTags;
- private int[] fldIds;
- private ArrayBackedValueStorage[] nameBuffers;
- private boolean areAllNullFields;
+ private final byte[] fieldTypeTags;
+ private final int[] fldIds;
+ private final ArrayBackedValueStorage[] nameBuffers;
- public DelimitedDataParser(IValueParserFactory[] valueParserFactories, char fieldDelimter, char quote,
- boolean hasHeader, ARecordType recordType, boolean isStreamParser) throws HyracksDataException {
- this.fieldDelimiter = fieldDelimter;
+ public DelimitedDataParser(IHyracksTaskContext ctx, IValueParserFactory[] valueParserFactories, char fieldDelimiter,
+ char quote, boolean hasHeader, ARecordType recordType, boolean isStreamParser) throws HyracksDataException {
+ this.ctx = ctx;
+ this.fieldDelimiter = fieldDelimiter;
this.quote = quote;
this.hasHeader = hasHeader;
this.recordType = recordType;
@@ -98,7 +100,7 @@
}
}
if (!isStreamParser) {
- cursor = new FieldCursorForDelimitedDataParser(null, fieldDelimiter, quote);
+ cursor = new FieldCursorForDelimitedDataParser(null, this.fieldDelimiter, quote);
}
}
@@ -107,10 +109,8 @@
try {
while (cursor.nextRecord()) {
parseRecord();
- if (!areAllNullFields) {
- recBuilder.write(out, true);
- return true;
- }
+ recBuilder.write(out, true);
+ return true;
}
return false;
} catch (IOException e) {
@@ -121,7 +121,6 @@
private void parseRecord() throws HyracksDataException {
recBuilder.reset(recordType);
recBuilder.init();
- areAllNullFields = true;
for (int i = 0; i < valueParsers.length; ++i) {
try {
@@ -134,27 +133,24 @@
fieldValueBuffer.reset();
try {
- if (cursor.fStart == cursor.fEnd && recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.STRING
+ if (cursor.isFieldEmpty() && recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.STRING
&& recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.NULL) {
// if the field is empty and the type is optional, insert
// NULL. Note that string type can also process empty field as an
// empty string
if (!NonTaggedFormatUtil.isOptional(recordType.getFieldTypes()[i])) {
- throw new RuntimeDataException(ErrorCode.PARSER_DELIMITED_NONOPTIONAL_NULL, cursor.recordCount,
- cursor.fieldCount);
+ throw new RuntimeDataException(ErrorCode.PARSER_DELIMITED_NONOPTIONAL_NULL,
+ cursor.getRecordCount(), cursor.getFieldCount());
}
fieldValueBufferOutput.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
} else {
fieldValueBufferOutput.writeByte(fieldTypeTags[i]);
- // Eliminate doule quotes in the field that we are going to parse
- if (cursor.isDoubleQuoteIncludedInThisField) {
- cursor.eliminateDoubleQuote(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
- cursor.fEnd -= cursor.doubleQuoteCount;
- cursor.isDoubleQuoteIncludedInThisField = false;
+ // Eliminate double quotes in the field that we are going to parse
+ if (cursor.fieldHasDoubleQuote()) {
+ cursor.eliminateDoubleQuote();
}
- valueParsers[i].parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart,
+ valueParsers[i].parse(cursor.getBuffer(), cursor.getFieldStart(), cursor.getFieldLength(),
fieldValueBufferOutput);
- areAllNullFields = false;
}
if (fldIds[i] < 0) {
recBuilder.addField(nameBuffers[i], fieldValueBuffer);
@@ -165,19 +161,16 @@
throw HyracksDataException.create(e);
}
}
+ if (valueParsers.length != cursor.getFieldCount()) {
+ throw new HyracksDataException("Record #" + cursor.getRecordCount() + " is missing some fields");
+ }
}
@Override
public void parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
- try {
- cursor.nextRecord(record.get(), record.size());
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
+ cursor.nextRecord(record.get(), record.size());
parseRecord();
- if (!areAllNullFields) {
- recBuilder.write(out, true);
- }
+ recBuilder.write(out, true);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
index f406729..46c5152 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
@@ -21,10 +21,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IStreamDataParser;
@@ -40,19 +37,20 @@
private static final long serialVersionUID = 1L;
private static final List<String> parserFormats =
- Collections.unmodifiableList(Arrays.asList("csv", "delimited-text"));
+ Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_CSV,
+ ExternalDataConstants.FORMAT_DELIMITED_TEXT, ExternalDataConstants.FORMAT_TSV));
@Override
public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
- return createParser();
+ return createParser(ctx);
}
- private DelimitedDataParser createParser() throws HyracksDataException {
+ private DelimitedDataParser createParser(IHyracksTaskContext ctx) throws HyracksDataException {
IValueParserFactory[] valueParserFactories = ExternalDataUtils.getValueParserFactories(recordType);
- Character delimiter = DelimitedDataParserFactory.getDelimiter(configuration);
- char quote = DelimitedDataParserFactory.getQuote(configuration, delimiter);
+ char delimiter = ExternalDataUtils.validateGetDelimiter(configuration);
+ char quote = ExternalDataUtils.validateGetQuote(configuration, delimiter);
boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
- return new DelimitedDataParser(valueParserFactories, delimiter, quote, hasHeader, recordType,
+ return new DelimitedDataParser(ctx, valueParserFactories, delimiter, quote, hasHeader, recordType,
ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.STREAM));
}
@@ -64,41 +62,7 @@
@Override
public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
- return createParser();
- }
-
- // Get a delimiter from the given configuration
- public static char getDelimiter(Map<String, String> configuration) throws HyracksDataException {
- String delimiterValue = configuration.get(ExternalDataConstants.KEY_DELIMITER);
- if (delimiterValue == null) {
- delimiterValue = ExternalDataConstants.DEFAULT_DELIMITER;
- } else if (delimiterValue.length() != 1) {
- throw new RuntimeDataException(ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_DELIMITER,
- delimiterValue);
- }
- return delimiterValue.charAt(0);
- }
-
- // Get a quote from the given configuration when the delimiter is given
- // Need to pass delimiter to check whether they share the same character
- public static char getQuote(Map<String, String> configuration, char delimiter) throws HyracksDataException {
- String quoteValue = configuration.get(ExternalDataConstants.KEY_QUOTE);
- if (quoteValue == null) {
- quoteValue = ExternalDataConstants.DEFAULT_QUOTE;
- } else if (quoteValue.length() != 1) {
- throw new RuntimeDataException(ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_QUOTE,
- quoteValue);
- }
-
- // Since delimiter (char type value) can't be null,
- // we only check whether delimiter and quote use the same character
- if (quoteValue.charAt(0) == delimiter) {
- throw new RuntimeDataException(
- ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_QUOTE_DELIMITER_MISMATCH, quoteValue,
- delimiter);
- }
-
- return quoteValue.charAt(0);
+ return createParser(ctx);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
index d34b5e0..704ea29 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
@@ -55,7 +55,7 @@
String recordFormat = configuration.get(ExternalDataConstants.KEY_RECORD_FORMAT);
if (recordFormat == null) {
throw AlgebricksException.create(ErrorCode.UNKNOWN_RECORD_FORMAT_FOR_META_PARSER,
- ExternalDataConstants.KEY_FORMAT);
+ ExternalDataConstants.KEY_RECORD_FORMAT);
}
String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
if (format == null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
index 414c460..27ac10e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -27,7 +27,7 @@
import org.apache.asterix.external.api.IIndexingAdapterFactory;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
+import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
@@ -39,11 +39,15 @@
*/
public class AdapterFactoryProvider {
- // Adapters
+ private AdapterFactoryProvider() {
+ }
+
+ // get adapter factory. this method has the side effect of modifying the configuration as necessary
public static ITypedAdapterFactory getAdapterFactory(IServiceContext serviceCtx, String adapterName,
Map<String, String> configuration, ARecordType itemType, ARecordType metaType)
throws HyracksDataException, AlgebricksException {
- ExternalDataCompatibilityUtils.prepare(adapterName, configuration);
+ ExternalDataUtils.defaultConfiguration(configuration);
+ ExternalDataUtils.prepare(adapterName, configuration);
ICcApplicationContext context = (ICcApplicationContext) serviceCtx.getApplicationContext();
ITypedAdapterFactory adapterFactory =
(ITypedAdapterFactory) context.getAdapterFactoryService().createAdapterFactory();
@@ -53,11 +57,12 @@
return adapterFactory;
}
- // Indexing Adapters
+ // get indexing adapter factory. this method has the side effect of modifying the configuration as necessary
public static IIndexingAdapterFactory getIndexingAdapterFactory(IServiceContext serviceCtx, String adapterName,
Map<String, String> configuration, ARecordType itemType, List<ExternalFile> snapshot, boolean indexingOp,
ARecordType metaType) throws HyracksDataException, AlgebricksException {
- ExternalDataCompatibilityUtils.prepare(adapterName, configuration);
+ ExternalDataUtils.defaultConfiguration(configuration);
+ ExternalDataUtils.prepare(adapterName, configuration);
GenericAdapterFactory adapterFactory = new GenericAdapterFactory();
adapterFactory.setOutputType(itemType);
adapterFactory.setMetaType(metaType);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index 53cf6b1..2265a25 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -46,15 +46,12 @@
public static IDataParserFactory getDataParserFactory(ILibraryManager libraryManager,
Map<String, String> configuration) throws AsterixException {
IDataParserFactory parserFactory;
- String parserFactoryName = configuration.get(ExternalDataConstants.KEY_DATA_PARSER);
+ String parserFactoryName = configuration.get(ExternalDataConstants.KEY_PARSER);
if (ExternalDataUtils.isExternal(parserFactoryName)) {
return ExternalDataUtils.createExternalParserFactory(libraryManager,
ExternalDataUtils.getDataverse(configuration), parserFactoryName);
} else {
- String parserFactoryKey = ExternalDataUtils.getRecordFormat(configuration);
- if (parserFactoryKey == null) {
- parserFactoryKey = configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
- }
+ String parserFactoryKey = ExternalDataUtils.getParserFactory(configuration);
parserFactory = ParserFactoryProvider.getDataParserFactory(parserFactoryKey);
}
return parserFactory;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
index e222e99..8181262 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.external.util;
-import java.util.Map;
-
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.IDataParserFactory;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
@@ -30,6 +28,9 @@
public class ExternalDataCompatibilityUtils {
+ private ExternalDataCompatibilityUtils() {
+ }
+
public static void validateCompatibility(IExternalDataSourceFactory dataSourceFactory,
IDataParserFactory dataParserFactory) throws AsterixException {
if (dataSourceFactory.getDataSourceType() != dataParserFactory.getDataSourceType()) {
@@ -58,16 +59,4 @@
+ recordParserFactory.getRecordClass());
}
}
-
- public static void prepare(String adapterName, Map<String, String> configuration) {
- if (!configuration.containsKey(ExternalDataConstants.KEY_READER)) {
- configuration.put(ExternalDataConstants.KEY_READER, adapterName);
- }
- if (!configuration.containsKey(ExternalDataConstants.KEY_PARSER)) {
- if (configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) {
- configuration.put(ExternalDataConstants.KEY_PARSER,
- configuration.get(ExternalDataConstants.KEY_FORMAT));
- }
- }
- }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 26f5402..a05ad77 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -18,7 +18,15 @@
*/
package org.apache.asterix.external.util;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
public class ExternalDataConstants {
+
+ private ExternalDataConstants() {
+ }
+
// TODO: Remove unused variables.
/**
* Keys
@@ -62,6 +70,7 @@
public static final String KEY_LOCAL_SOCKET_PATH = "local-socket-path";
public static final String KEY_FORMAT = "format";
public static final String KEY_QUOTE = "quote";
+ public static final String KEY_QUOTE_ESCAPE = "quote-escape";
public static final String KEY_PARSER = "parser";
public static final String KEY_DATASET_RECORD = "dataset-record";
public static final String KEY_HIVE_SERDE = "hive-serde";
@@ -163,6 +172,26 @@
public static final String FORMAT_LINE_SEPARATED = "line-separated";
public static final String FORMAT_HDFS_WRITABLE = "hdfs-writable";
public static final String FORMAT_KV = "kv";
+ public static final String FORMAT_CSV = "csv";
+ public static final String FORMAT_TSV = "tsv";
+ public static final Set<String> ALL_FORMATS;
+ static {
+ Set<String> formats = new HashSet<>(13);
+ formats.add(FORMAT_HIVE);
+ formats.add(FORMAT_BINARY);
+ formats.add(FORMAT_ADM);
+ formats.add(FORMAT_JSON_LOWER_CASE);
+ formats.add(FORMAT_DELIMITED_TEXT);
+ formats.add(FORMAT_TWEET);
+ formats.add(FORMAT_RSS);
+ formats.add(FORMAT_SEMISTRUCTURED);
+ formats.add(FORMAT_LINE_SEPARATED);
+ formats.add(FORMAT_HDFS_WRITABLE);
+ formats.add(FORMAT_KV);
+ formats.add(FORMAT_CSV);
+ formats.add(FORMAT_TSV);
+ ALL_FORMATS = Collections.unmodifiableSet(formats);
+ }
/**
* input streams
@@ -188,6 +217,8 @@
*/
public static final String TRUE = "true";
public static final String FALSE = "false";
+ public static final String TAB_STR = "\t";
+ public static final String NULL_STR = "\0";
/**
* Constant characters
@@ -227,7 +258,6 @@
public static final String EXTERNAL = "external";
public static final String KEY_READER_FACTORY = "reader-factory";
public static final String READER_RSS = "rss_feed";
- public static final String FORMAT_CSV = "csv";
public static final String ERROR_PARSE_RECORD = "Parser failed to parse record";
@@ -238,8 +268,5 @@
public static final String CONTAINER_NAME_FIELD_NAME = "container";
public static final String DEFINITION_FIELD_NAME = "definition";
public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
- public static final String[] REQUIRED_LINK_PARAMETERS =
- new String[] { ACCESS_KEY_FIELD_NAME, SECRET_KEY_FIELD_NAME, REGION_FIELD_NAME };
- public static final String[] OPTIONAL_LINK_PARAMETERS = new String[] { SERVICE_END_POINT_FIELD_NAME };
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 8aebd90..652eae5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -38,6 +38,7 @@
import org.apache.asterix.om.types.AUnionType;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.parsers.BooleanParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -47,46 +48,49 @@
public class ExternalDataUtils {
+ private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
+ static {
+ valueParserFactoryMap.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE);
+ valueParserFactoryMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
+ valueParserFactoryMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
+ valueParserFactoryMap.put(ATypeTag.BIGINT, LongParserFactory.INSTANCE);
+ valueParserFactoryMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
+ valueParserFactoryMap.put(ATypeTag.BOOLEAN, BooleanParserFactory.INSTANCE);
+ }
+
+ private ExternalDataUtils() {
+ }
+
// Get a delimiter from the given configuration
- public static char getDelimiter(Map<String, String> configuration) throws AsterixException {
+ public static char validateGetDelimiter(Map<String, String> configuration) throws HyracksDataException {
String delimiterValue = configuration.get(ExternalDataConstants.KEY_DELIMITER);
if (delimiterValue == null) {
- delimiterValue = ExternalDataConstants.DEFAULT_DELIMITER;
- } else if (delimiterValue.length() != 1) {
- throw new AsterixException(
- "'" + delimiterValue + "' is not a valid delimiter. The length of a delimiter should be 1.");
+ return ExternalDataConstants.DEFAULT_DELIMITER.charAt(0);
}
+ validateDelimiter(delimiterValue);
return delimiterValue.charAt(0);
}
// Get a quote from the given configuration when the delimiter is given
// Need to pass delimiter to check whether they share the same character
- public static char getQuote(Map<String, String> configuration, char delimiter) throws AsterixException {
+ public static char validateGetQuote(Map<String, String> configuration, char delimiter) throws HyracksDataException {
String quoteValue = configuration.get(ExternalDataConstants.KEY_QUOTE);
if (quoteValue == null) {
- quoteValue = ExternalDataConstants.DEFAULT_QUOTE;
- } else if (quoteValue.length() != 1) {
- throw new AsterixException("'" + quoteValue + "' is not a valid quote. The length of a quote should be 1.");
+ return ExternalDataConstants.DEFAULT_QUOTE.charAt(0);
}
-
- // Since delimiter (char type value) can't be null,
- // we only check whether delimiter and quote use the same character
- if (quoteValue.charAt(0) == delimiter) {
- throw new AsterixException(
- "Quote '" + quoteValue + "' cannot be used with the delimiter '" + delimiter + "'. ");
- }
-
- return quoteValue.charAt(0);
+ validateQuote(quoteValue);
+ char quote = quoteValue.charAt(0);
+ validateDelimiterAndQuote(delimiter, quote);
+ return quote;
}
- // Get the header flag
- public static boolean getHasHeader(Map<String, String> configuration) {
- return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_HEADER));
- }
-
- public static void validateParameters(Map<String, String> configuration) throws AsterixException {
- validateDataSourceParameters(configuration);
- validateDataParserParameters(configuration);
+ public static char validateGetQuoteEscape(Map<String, String> configuration) throws HyracksDataException {
+ String quoteEscapeValue = configuration.get(ExternalDataConstants.KEY_QUOTE_ESCAPE);
+ if (quoteEscapeValue == null) {
+ return ExternalDataConstants.ESCAPE;
+ }
+ validateQuoteEscape(quoteEscapeValue);
+ return quoteEscapeValue.charAt(0);
}
public static void validateDataParserParameters(Map<String, String> configuration) throws AsterixException {
@@ -94,8 +98,8 @@
if (parser == null) {
String parserFactory = configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
if (parserFactory == null) {
- throw new AsterixException("The parameter " + ExternalDataConstants.KEY_FORMAT + " or "
- + ExternalDataConstants.KEY_PARSER_FACTORY + " must be specified.");
+ throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED,
+ ExternalDataConstants.KEY_FORMAT + " or " + ExternalDataConstants.KEY_PARSER_FACTORY);
}
}
}
@@ -103,7 +107,7 @@
public static void validateDataSourceParameters(Map<String, String> configuration) throws AsterixException {
String reader = configuration.get(ExternalDataConstants.KEY_READER);
if (reader == null) {
- throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER + " must be specified.");
+ throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_READER);
}
}
@@ -149,30 +153,13 @@
return DataverseName.createFromCanonicalForm(configuration.get(ExternalDataConstants.KEY_DATAVERSE));
}
- public static String getRecordFormat(Map<String, String> configuration) {
- String parserFormat = configuration.get(ExternalDataConstants.KEY_DATA_PARSER);
- return parserFormat != null ? parserFormat : configuration.get(ExternalDataConstants.KEY_FORMAT);
- }
-
- public static void setRecordFormat(Map<String, String> configuration, String format) {
- if (!configuration.containsKey(ExternalDataConstants.KEY_DATA_PARSER)) {
- configuration.put(ExternalDataConstants.KEY_DATA_PARSER, format);
+ public static String getParserFactory(Map<String, String> configuration) {
+ String parserFactory = configuration.get(ExternalDataConstants.KEY_PARSER);
+ if (parserFactory != null) {
+ return parserFactory;
}
- if (!configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) {
- configuration.put(ExternalDataConstants.KEY_FORMAT, format);
- }
- }
-
- private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
-
- private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
- Map<ATypeTag, IValueParserFactory> m = new EnumMap<>(ATypeTag.class);
- m.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE);
- m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
- m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
- m.put(ATypeTag.BIGINT, LongParserFactory.INSTANCE);
- m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
- return m;
+ parserFactory = configuration.get(ExternalDataConstants.KEY_FORMAT);
+ return parserFactory != null ? parserFactory : configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
}
public static IValueParserFactory[] getValueParserFactories(ARecordType recordType) {
@@ -205,10 +192,6 @@
return vpf;
}
- public static String getRecordReaderStreamName(Map<String, String> configuration) {
- return configuration.get(ExternalDataConstants.KEY_READER_STREAM);
- }
-
public static boolean hasHeader(Map<String, String> configuration) {
String value = configuration.get(ExternalDataConstants.KEY_HEADER);
if (value != null) {
@@ -294,12 +277,6 @@
return configuration.get(ExternalDataConstants.KEY_FEED_NAME);
}
- public static int getQueueSize(Map<String, String> configuration) {
- return configuration.containsKey(ExternalDataConstants.KEY_QUEUE_SIZE)
- ? Integer.parseInt(configuration.get(ExternalDataConstants.KEY_QUEUE_SIZE))
- : ExternalDataConstants.DEFAULT_QUEUE_SIZE;
- }
-
public static boolean isRecordWithMeta(Map<String, String> configuration) {
return configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME);
}
@@ -319,8 +296,7 @@
public static int getNumberOfKeys(Map<String, String> configuration) throws AsterixException {
String keyIndexes = configuration.get(ExternalDataConstants.KEY_KEY_INDEXES);
if (keyIndexes == null) {
- throw new AsterixException(
- "A change feed must have the parameter " + ExternalDataConstants.KEY_KEY_INDEXES);
+ throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_KEY_INDEXES);
}
return keyIndexes.split(",").length;
}
@@ -352,4 +328,120 @@
}
return intIndicators;
}
+
+ /**
+ * Fills the configuration of the external dataset and its adapter with default values if not provided by user.
+ *
+ * @param configuration external data configuration
+ */
+ public static void defaultConfiguration(Map<String, String> configuration) {
+ String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
+ if (format != null) {
+ // default quote, escape character for quote and fields delimiter for csv and tsv format
+ if (format.equals(ExternalDataConstants.FORMAT_CSV)) {
+ configuration.putIfAbsent(ExternalDataConstants.KEY_DELIMITER, ExternalDataConstants.DEFAULT_DELIMITER);
+ configuration.putIfAbsent(ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.DEFAULT_QUOTE);
+ configuration.putIfAbsent(ExternalDataConstants.KEY_QUOTE_ESCAPE, ExternalDataConstants.DEFAULT_QUOTE);
+ } else if (format.equals(ExternalDataConstants.FORMAT_TSV)) {
+ configuration.putIfAbsent(ExternalDataConstants.KEY_DELIMITER, ExternalDataConstants.TAB_STR);
+ configuration.putIfAbsent(ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.NULL_STR);
+ configuration.putIfAbsent(ExternalDataConstants.KEY_QUOTE_ESCAPE, ExternalDataConstants.NULL_STR);
+ }
+ }
+ }
+
+ /**
+ * Prepares the configuration of the external data and its adapter by filling the information required by
+ * adapters and parsers.
+ *
+ * @param adapterName adapter name
+ * @param configuration external data configuration
+ */
+ public static void prepare(String adapterName, Map<String, String> configuration) {
+ if (!configuration.containsKey(ExternalDataConstants.KEY_READER)) {
+ configuration.put(ExternalDataConstants.KEY_READER, adapterName);
+ }
+ if (!configuration.containsKey(ExternalDataConstants.KEY_PARSER)
+ && configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) {
+ configuration.put(ExternalDataConstants.KEY_PARSER, configuration.get(ExternalDataConstants.KEY_FORMAT));
+ }
+ }
+
+ /**
+ * Normalizes the values of certain parameters of the adapter configuration. This should happen before persisting
+ * the metadata (e.g. when creating external datasets or feeds) and when creating an adapter factory.
+ *
+ * @param configuration external data configuration
+ */
+ public static void normalize(Map<String, String> configuration) {
+ // normalize the "format" parameter
+ String paramValue = configuration.get(ExternalDataConstants.KEY_FORMAT);
+ if (paramValue != null) {
+ String lowerCaseFormat = paramValue.toLowerCase().trim();
+ if (ExternalDataConstants.ALL_FORMATS.contains(lowerCaseFormat)) {
+ configuration.put(ExternalDataConstants.KEY_FORMAT, lowerCaseFormat);
+ }
+ }
+ // normalize the "header" parameter
+ paramValue = configuration.get(ExternalDataConstants.KEY_HEADER);
+ if (paramValue != null) {
+ configuration.put(ExternalDataConstants.KEY_HEADER, paramValue.toLowerCase().trim());
+ }
+ }
+
+ /**
+ * Validates the parameter values of the adapter configuration. This should happen after normalizing the values.
+ *
+ * @param configuration external data configuration
+ * @throws HyracksDataException HyracksDataException
+ */
+ public static void validate(Map<String, String> configuration) throws HyracksDataException {
+ String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
+ String header = configuration.get(ExternalDataConstants.KEY_HEADER);
+ if (format != null && isHeaderRequiredFor(format) && header == null) {
+ throw new RuntimeDataException(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_HEADER);
+ }
+ if (header != null && !isBoolean(header)) {
+ throw new RuntimeDataException(ErrorCode.INVALID_REQ_PARAM_VAL, ExternalDataConstants.KEY_HEADER, header);
+ }
+ char delimiter = validateGetDelimiter(configuration);
+ validateGetQuote(configuration, delimiter);
+ validateGetQuoteEscape(configuration);
+ }
+
+ private static boolean isHeaderRequiredFor(String format) {
+ return format.equals(ExternalDataConstants.FORMAT_CSV) || format.equals(ExternalDataConstants.FORMAT_TSV);
+ }
+
+ private static boolean isBoolean(String value) {
+ return value.equals(ExternalDataConstants.TRUE) || value.equals(ExternalDataConstants.FALSE);
+ }
+
+ private static void validateDelimiter(String delimiter) throws RuntimeDataException {
+ if (delimiter.length() != 1) {
+ throw new RuntimeDataException(ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_DELIMITER,
+ delimiter);
+ }
+ }
+
+ public static void validateQuote(String quote) throws RuntimeDataException {
+ if (quote.length() != 1) {
+ throw new RuntimeDataException(ErrorCode.PARSER_INVALID_CHAR_LENGTH, quote,
+ ExternalDataConstants.KEY_QUOTE);
+ }
+ }
+
+ private static void validateQuoteEscape(String quoteEsc) throws RuntimeDataException {
+ if (quoteEsc.length() != 1) {
+ throw new RuntimeDataException(ErrorCode.PARSER_INVALID_CHAR_LENGTH, quoteEsc,
+ ExternalDataConstants.KEY_QUOTE_ESCAPE);
+ }
+ }
+
+ private static void validateDelimiterAndQuote(char delimiter, char quote) throws RuntimeDataException {
+ if (quote == delimiter) {
+ throw new RuntimeDataException(
+ ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_QUOTE_DELIMITER_MISMATCH, quote, delimiter);
+ }
+ }
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
index 2ba5a3e..2972542 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
@@ -41,6 +41,8 @@
import org.apache.asterix.external.input.stream.LocalFSInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FileSystemWatcher;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.test.support.TestUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -56,6 +58,7 @@
private final CharBuffer chars = CharBuffer.allocate(BUFFER_SIZE);
private final CharArrayRecord value = new CharArrayRecord();
private final ByteBuf nettyBuffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer(KB32, Integer.MAX_VALUE);
+ private final IHyracksTaskContext ctx = TestUtils.createHyracksTask();
@Test
public void eatGlass() {
@@ -83,7 +86,7 @@
FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
LocalFSInputStream in = new LocalFSInputStream(watcher);
try (SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader()) {
- recordReader.configure(in, config);
+ recordReader.configure(ctx, in, config);
while (recordReader.hasNext()) {
try {
IRawRecord<char[]> record = recordReader.next();
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
index 1584065..47c6ffe 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
@@ -44,12 +44,15 @@
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.algebricks.data.IPrinter;
import org.apache.hyracks.algebricks.data.IPrinterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.test.support.TestUtils;
import org.junit.Assert;
public class RecordWithMetaTest {
+ private final IHyracksTaskContext ctx = TestUtils.createHyracksTask();
private static ARecordType recordType;
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -90,7 +93,7 @@
config.put(ExternalDataConstants.KEY_HEADER, "true");
config.put(ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.DEFAULT_QUOTE);
LineRecordReader lineReader = new LineRecordReader();
- lineReader.configure(inputStream, config);
+ lineReader.configure(ctx, inputStream, config);
// create csv with json record reader
CSVToRecordWithMetadataAndPKConverter recordConverter = new CSVToRecordWithMetadataAndPKConverter(
valueIndex, delimiter, metaType, recordType, pkIndicators, pkIndexes, keyTypes);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index 64b703b..f0567a4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -22,6 +22,7 @@
import java.io.DataOutput;
import java.util.ArrayList;
import java.util.Calendar;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -64,6 +65,7 @@
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.compression.CompressionManager;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -115,20 +117,9 @@
String nodeGroupName =
((AString) datasetRecord.getValueByPos(MetadataRecordTypes.DATASET_ARECORD_GROUPNAME_FIELD_INDEX))
.getStringValue();
- String compactionPolicy = ((AString) datasetRecord
- .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX)).getStringValue();
- IACursor cursor = ((AOrderedList) datasetRecord
- .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX))
- .getCursor();
- Map<String, String> compactionPolicyProperties = new LinkedHashMap<>();
- while (cursor.next()) {
- ARecord field = (ARecord) cursor.get();
- String key =
- ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)).getStringValue();
- String value =
- ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)).getStringValue();
- compactionPolicyProperties.put(key, value);
- }
+
+ Pair<String, Map<String, String>> compactionPolicy = readCompactionPolicy(datasetType, datasetRecord);
+
switch (datasetType) {
case INTERNAL: {
ARecord datasetDetailsRecord = (ARecord) datasetRecord
@@ -139,7 +130,7 @@
PartitioningStrategy partitioningStrategy = PartitioningStrategy.valueOf(((AString) datasetDetailsRecord
.getValueByPos(MetadataRecordTypes.INTERNAL_DETAILS_ARECORD_PARTITIONSTRATEGY_FIELD_INDEX))
.getStringValue());
- cursor = ((AOrderedList) datasetDetailsRecord
+ IACursor cursor = ((AOrderedList) datasetDetailsRecord
.getValueByPos(MetadataRecordTypes.INTERNAL_DETAILS_ARECORD_PARTITIONKEY_FIELD_INDEX))
.getCursor();
List<List<String>> partitioningKey = new ArrayList<>();
@@ -199,7 +190,7 @@
String adapter = ((AString) datasetDetailsRecord
.getValueByPos(MetadataRecordTypes.EXTERNAL_DETAILS_ARECORD_DATASOURCE_ADAPTER_FIELD_INDEX))
.getStringValue();
- cursor = ((AOrderedList) datasetDetailsRecord
+ IACursor cursor = ((AOrderedList) datasetDetailsRecord
.getValueByPos(MetadataRecordTypes.EXTERNAL_DETAILS_ARECORD_PROPERTIES_FIELD_INDEX))
.getCursor();
Map<String, String> properties = new HashMap<>();
@@ -242,10 +233,34 @@
String compressionScheme = getCompressionScheme(datasetRecord);
return new Dataset(dataverseName, datasetName, typeDataverseName, typeName, metaTypeDataverseName, metaTypeName,
- nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, hints, datasetType,
+ nodeGroupName, compactionPolicy.first, compactionPolicy.second, datasetDetails, hints, datasetType,
datasetId, pendingOp, rebalanceCount, compressionScheme);
}
+ protected Pair<String, Map<String, String>> readCompactionPolicy(DatasetType datasetType, ARecord datasetRecord) {
+
+ String compactionPolicy = ((AString) datasetRecord
+ .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX)).getStringValue();
+ AOrderedList compactionPolicyPropertiesList = ((AOrderedList) datasetRecord
+ .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX));
+
+ Map<String, String> compactionPolicyProperties;
+ if (compactionPolicyPropertiesList.size() > 0) {
+ compactionPolicyProperties = new LinkedHashMap<>();
+ for (IACursor cursor = compactionPolicyPropertiesList.getCursor(); cursor.next();) {
+ ARecord field = (ARecord) cursor.get();
+ String key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX))
+ .getStringValue();
+ String value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX))
+ .getStringValue();
+ compactionPolicyProperties.put(key, value);
+ }
+ } else {
+ compactionPolicyProperties = Collections.emptyMap();
+ }
+ return new Pair<>(compactionPolicy, compactionPolicyProperties);
+ }
+
private long getRebalanceCount(ARecord datasetRecord) {
// Read the rebalance count if there is one.
int rebalanceCountIndex =
@@ -323,29 +338,9 @@
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_GROUPNAME_FIELD_INDEX, fieldValue);
- // write field 6
- fieldValue.reset();
- aString.setValue(dataset.getCompactionPolicy());
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX, fieldValue);
-
- // write field 7
- listBuilder.reset((AOrderedListType) MetadataRecordTypes.DATASET_RECORDTYPE
- .getFieldTypes()[MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX]);
- if (dataset.getCompactionPolicyProperties() != null) {
- for (Map.Entry<String, String> property : dataset.getCompactionPolicyProperties().entrySet()) {
- String name = property.getKey();
- String value = property.getValue();
- itemValue.reset();
- DatasetUtil.writePropertyTypeRecord(name, value, itemValue.getDataOutput(),
- MetadataRecordTypes.COMPACTION_POLICY_PROPERTIES_RECORDTYPE);
- listBuilder.addItem(itemValue);
- }
- }
- fieldValue.reset();
- listBuilder.write(fieldValue.getDataOutput(), true);
- recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX,
- fieldValue);
+ // write field 6/7
+ writeCompactionPolicy(dataset.getDatasetType(), dataset.getCompactionPolicy(),
+ dataset.getCompactionPolicyProperties(), listBuilder, itemValue);
// write field 8/9
fieldValue.reset();
@@ -395,6 +390,34 @@
return tuple;
}
+ protected void writeCompactionPolicy(DatasetType datasetType, String compactionPolicy,
+ Map<String, String> compactionPolicyProperties, OrderedListBuilder listBuilder,
+ ArrayBackedValueStorage itemValue) throws HyracksDataException {
+ // write field 6
+ fieldValue.reset();
+ aString.setValue(compactionPolicy);
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX, fieldValue);
+
+ // write field 7
+ listBuilder.reset((AOrderedListType) MetadataRecordTypes.DATASET_RECORDTYPE
+ .getFieldTypes()[MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX]);
+ if (compactionPolicyProperties != null && !compactionPolicyProperties.isEmpty()) {
+ for (Map.Entry<String, String> property : compactionPolicyProperties.entrySet()) {
+ String name = property.getKey();
+ String value = property.getValue();
+ itemValue.reset();
+ DatasetUtil.writePropertyTypeRecord(name, value, itemValue.getDataOutput(),
+ MetadataRecordTypes.COMPACTION_POLICY_PROPERTIES_RECORDTYPE);
+ listBuilder.addItem(itemValue);
+ }
+ }
+ fieldValue.reset();
+ listBuilder.write(fieldValue.getDataOutput(), true);
+ recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX,
+ fieldValue);
+ }
+
/**
* Keep protected to allow other extensions to add additional fields
*/
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 09eaa8b..e0a8e83 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -143,13 +143,13 @@
default:
throw new AsterixException("Unknown Adapter type " + adapterType);
}
- adapterFactory.setOutputType(adapterOutputType);
- adapterFactory.setMetaType(metaType);
- adapterFactory.configure(appCtx.getServiceContext(), configuration);
} else {
- AdapterFactoryProvider.getAdapterFactory(appCtx.getServiceContext(), adapterName, configuration,
- adapterOutputType, metaType);
+ ExternalDataUtils.prepare(adapterName, configuration);
+ adapterFactory = (ITypedAdapterFactory) appCtx.getAdapterFactoryService().createAdapterFactory();
}
+ adapterFactory.setOutputType(adapterOutputType);
+ adapterFactory.setMetaType(metaType);
+ adapterFactory.configure(appCtx.getServiceContext(), configuration);
if (metaType == null && configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME)) {
metaType = getOutputType(feed, configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME));
if (metaType == null) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetItemDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetItemDescriptor.java
index f7bddf1..fb71eae 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetItemDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetItemDescriptor.java
@@ -116,6 +116,10 @@
int itemIndex = ATypeHierarchy.getIntegerValue(BuiltinFunctions.GET_ITEM.getName(), 0,
indexBytes, indexOffset);
+ if (itemIndex < 0) {
+ itemIndex = itemCount + itemIndex;
+ }
+
if (itemIndex < 0 || itemIndex >= itemCount) {
// Out-of-bound index access should return MISSING.
result.set(missingBytes, 0, 1);
diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml
index b660379..b794d73 100644
--- a/asterixdb/asterix-server/pom.xml
+++ b/asterixdb/asterix-server/pom.xml
@@ -169,8 +169,8 @@
<url>https://raw.githubusercontent.com/mojohaus/appassembler/appassembler-2.0.0/LICENSE.txt</url>
</override>
<override>
- <gav>io.netty:netty-all:4.1.46.Final</gav>
- <noticeUrl>https://raw.githubusercontent.com/netty/netty/netty-4.1.46.Final/NOTICE.txt</noticeUrl>
+ <gav>io.netty:netty-all:4.1.48.Final</gav>
+ <noticeUrl>https://raw.githubusercontent.com/netty/netty/netty-4.1.48.Final/NOTICE.txt</noticeUrl>
</override>
<override>
<gav>org.reactivestreams:reactive-streams:1.0.2</gav>
diff --git a/asterixdb/src/main/appended-resources/supplemental-models.xml b/asterixdb/src/main/appended-resources/supplemental-models.xml
index 812c64a..2d74604 100644
--- a/asterixdb/src/main/appended-resources/supplemental-models.xml
+++ b/asterixdb/src/main/appended-resources/supplemental-models.xml
@@ -160,9 +160,9 @@
<artifactId>netty-all</artifactId>
<properties>
<!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
- <license.ignoreMissingEmbeddedLicense>4.1.25.Final,4.1.32.Final,4.1.42.Final,4.1.46.Final</license.ignoreMissingEmbeddedLicense>
- <license.ignoreMissingEmbeddedNotice>4.1.25.Final,4.1.32.Final,4.1.42.Final,4.1.46.Final</license.ignoreMissingEmbeddedNotice>
- <license.ignoreNoticeOverride>4.1.25.Final,4.1.32.Final,4.1.42.Final,4.1.46.Final</license.ignoreNoticeOverride>
+ <license.ignoreMissingEmbeddedLicense>4.1.25.Final,4.1.32.Final,4.1.42.Final,4.1.48.Final</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>4.1.25.Final,4.1.32.Final,4.1.42.Final,4.1.48.Final</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreNoticeOverride>4.1.25.Final,4.1.32.Final,4.1.42.Final,4.1.48.Final</license.ignoreNoticeOverride>
</properties>
</project>
</supplement>
diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_netty_netty_netty-4.1.46.Final_NOTICE.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_netty_netty_netty-4.1.48.Final_NOTICE.txt
similarity index 100%
rename from asterixdb/src/main/licenses/content/raw.githubusercontent.com_netty_netty_netty-4.1.46.Final_NOTICE.txt
rename to asterixdb/src/main/licenses/content/raw.githubusercontent.com_netty_netty_netty-4.1.48.Final_NOTICE.txt
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java
index 4deba7b..7baf268 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannel.java
@@ -37,4 +37,9 @@
public void open(IHyracksCommonContext ctx) throws HyracksDataException;
public void close() throws HyracksDataException;
+
+ /**
+ * Called when a failure is encountered while reading data
+ */
+ void fail();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index cfb6c9e..87b963a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -59,7 +59,7 @@
40 = Unknown inverted index type %1$s
41 = Cannot propose linearizer if dimensions have different types
42 = Cannot propose linearizer for type %1$s
-43 = Record size (%1$s) larger than maximum acceptable record size (%2$s)
+43 = Index entry size (%1$s) larger than maximum acceptable entry size (%2$s)
44 = Failed to re-find parent of a page in the tree
45 = Failed to find a tuple in a page
46 = Unsorted load input
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
index c8f5bd9..3c5a3d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSetReader.java
@@ -90,36 +90,43 @@
public int read(IFrame frame) throws HyracksDataException {
frame.reset();
int readSize = 0;
- if (isFirstRead() && !hasNextRecord()) {
- return readSize;
- }
- // read until frame is full or all result records have been read
- while (readSize < frame.getFrameSize()) {
- if (currentRecordMonitor.hasMoreFrames()) {
- final ByteBuffer readBuffer = currentRecordChannel.getNextBuffer();
- if (readBuffer == null) {
- throw new IllegalStateException("Unexpected empty frame");
- }
- currentRecordMonitor.notifyFrameRead();
- if (readSize == 0) {
- final int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer);
- frame.ensureFrameSize(frame.getMinSize() * nBlocks);
- frame.getBuffer().clear();
- }
- frame.getBuffer().put(readBuffer);
- currentRecordChannel.recycleBuffer(readBuffer);
- readSize = frame.getBuffer().position();
- } else {
- currentRecordChannel.close();
- if (currentRecordMonitor.failed()) {
- throw HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId);
- }
- if (isLastRecord() || !hasNextRecord()) {
- break;
+ try {
+ if (isFirstRead() && !hasNextRecord()) {
+ return readSize;
+ }
+ // read until frame is full or all result records have been read
+ while (readSize < frame.getFrameSize()) {
+ if (currentRecordMonitor.hasMoreFrames()) {
+ final ByteBuffer readBuffer = currentRecordChannel.getNextBuffer();
+ if (readBuffer == null) {
+ throw new IllegalStateException("Unexpected empty frame");
+ }
+ currentRecordMonitor.notifyFrameRead();
+ if (readSize == 0) {
+ final int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer);
+ frame.ensureFrameSize(frame.getMinSize() * nBlocks);
+ frame.getBuffer().clear();
+ }
+ frame.getBuffer().put(readBuffer);
+ currentRecordChannel.recycleBuffer(readBuffer);
+ readSize = frame.getBuffer().position();
+ } else {
+ currentRecordChannel.close();
+ if (currentRecordMonitor.failed()) {
+ throw HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId);
+ }
+ if (isLastRecord() || !hasNextRecord()) {
+ break;
+ }
}
}
+ frame.getBuffer().flip();
+ } catch (Exception e) {
+ if (isLocalFailure()) {
+ currentRecordChannel.fail();
+ }
+ throw e;
}
- frame.getBuffer().flip();
return readSize;
}
@@ -201,6 +208,10 @@
return knownRecords != null && currentRecord == knownRecords.length - 1;
}
+ private boolean isLocalFailure() {
+ return currentRecordMonitor != null && !currentRecordMonitor.failed();
+ }
+
private static class ResultInputChannelMonitor implements IInputChannelMonitor {
private int availableFrames;
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
index 58664c6..53bb7cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
@@ -119,6 +119,11 @@
}
+ @Override
+ public void fail() {
+ // do nothing (covered by job lifecycle)
+ }
+
private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
@Override
public void accept(ByteBuffer buffer) {
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
index 1df39e9..38cf7c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ResultNetworkInputChannel.java
@@ -32,6 +32,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -127,6 +128,11 @@
}
+ @Override
+ public void fail() {
+ ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+ }
+
private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
@Override
public void accept(ByteBuffer buffer) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index 83677f8..2a40eb8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -99,6 +99,11 @@
}
+ @Override
+ public void fail() {
+ // do nothing (covered by job lifecycle)
+ }
+
private class FrameWriter implements IFrameWriter {
@Override
public void open() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/parsers/BooleanParserFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/parsers/BooleanParserFactory.java
new file mode 100644
index 0000000..488be04
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/parsers/BooleanParserFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.parsers;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class BooleanParserFactory implements IValueParserFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final IValueParserFactory INSTANCE = new BooleanParserFactory();
+
+ private BooleanParserFactory() {
+ }
+
+ @Override
+ public IValueParser createValueParser() {
+ return BooleanParserFactory::parse;
+ }
+
+ public static void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException {
+ try {
+ if (length == 4 && (buffer[start] == 't' || buffer[start] == 'T')
+ && (buffer[start + 1] == 'r' || buffer[start + 1] == 'R')
+ && (buffer[start + 2] == 'u' || buffer[start + 2] == 'U')
+ && (buffer[start + 3] == 'e' || buffer[start + 3] == 'E')) {
+ out.writeBoolean(true);
+ return;
+ } else if (length == 5 && (buffer[start] == 'f' || buffer[start] == 'F')
+ && (buffer[start + 1] == 'a' || buffer[start + 1] == 'A')
+ && (buffer[start + 2] == 'l' || buffer[start + 2] == 'L')
+ && (buffer[start + 3] == 's' || buffer[start + 3] == 'S')
+ && (buffer[start + 4] == 'e' || buffer[start + 4] == 'E')) {
+ out.writeBoolean(false);
+ return;
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+
+ throw new HyracksDataException("Invalid input data");
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index 9ddb4c2..2eb882a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -76,12 +76,11 @@
break;
}
// Eliminate double quotes in the field that we are going to parse
- if (cursor.isDoubleQuoteIncludedInThisField) {
- cursor.eliminateDoubleQuote(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
- cursor.fEnd -= cursor.doubleQuoteCount;
- cursor.isDoubleQuoteIncludedInThisField = false;
+ if (cursor.fieldHasDoubleQuote()) {
+ cursor.eliminateDoubleQuote();
}
- valueParsers[i].parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart, dos);
+ valueParsers[i].parse(cursor.getBuffer(), cursor.getFieldStart(), cursor.getFieldLength(),
+ dos);
tb.addFieldEndOffset();
}
FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
index 7e5ee2c..fd3e4c3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
@@ -32,18 +32,18 @@
EOF //end of stream reached
}
- public char[] buffer; //buffer to holds the input coming form the underlying input stream
- public int fStart; //start position for field
- public int fEnd; //end position for field
- public int recordCount; //count of records
- public int fieldCount; //count of fields in current record
- public int doubleQuoteCount; //count of double quotes
- public boolean isDoubleQuoteIncludedInThisField; //does current field include double quotes
+ private char[] buffer; //buffer to holds the input coming form the underlying input stream
+ private int fStart; //start position for field
+ private int fEnd; //end position for field
+ private int recordCount; //count of records
+ private int fieldCount; //count of fields in current record
+ private int doubleQuoteCount; //count of double quotes
+ private boolean isDoubleQuoteIncludedInThisField; //does current field include double quotes
private static final int INITIAL_BUFFER_SIZE = 4096;//initial buffer size
private static final int INCREMENT = 4096; //increment size
- private Reader in; //the underlying buffer
+ private final Reader in; //the underlying buffer
private int start; //start of valid buffer area
private int end; //end of valid buffer area
@@ -55,8 +55,8 @@
private int quoteCount; //count of single quotes
private boolean startedQuote; //whether a quote has been started
- private char quote; //the quote character
- private char fieldDelimiter; //the delimiter
+ private final char quote; //the quote character
+ private final char fieldDelimiter; //the delimiter
public FieldCursorForDelimitedDataParser(Reader in, char fieldDelimiter, char quote) {
this.in = in;
@@ -70,9 +70,9 @@
state = State.INIT;
this.quote = quote;
this.fieldDelimiter = fieldDelimiter;
- lastDelimiterPosition = -99;
- lastQuotePosition = -99;
- lastDoubleQuotePosition = -99;
+ lastDelimiterPosition = -1;
+ lastQuotePosition = -1;
+ lastDoubleQuotePosition = -1;
quoteCount = 0;
doubleQuoteCount = 0;
startedQuote = false;
@@ -81,9 +81,44 @@
fieldCount = 0;
}
- public void nextRecord(char[] buffer, int recordLength) throws IOException {
+ public char[] getBuffer() {
+ return buffer;
+ }
+
+ public int getFieldStart() {
+ return fStart;
+ }
+
+ public int getFieldLength() {
+ return fEnd - fStart;
+ }
+
+ public boolean isFieldEmpty() {
+ return fStart == fEnd;
+ }
+
+ public boolean fieldHasDoubleQuote() {
+ return isDoubleQuoteIncludedInThisField;
+ }
+
+ public int getFieldCount() {
+ return fieldCount;
+ }
+
+ public int getRecordCount() {
+ return recordCount;
+ }
+
+ public void nextRecord(char[] buffer, int recordLength) {
recordCount++;
fieldCount = 0;
+ lastDelimiterPosition = -1;
+ lastQuotePosition = -1;
+ lastDoubleQuotePosition = -1;
+ quoteCount = 0;
+ doubleQuoteCount = 0;
+ startedQuote = false;
+ isDoubleQuoteIncludedInThisField = false;
start = 0;
end = recordLength;
state = State.IN_RECORD;
@@ -187,7 +222,6 @@
}
public boolean nextField() throws IOException {
- fieldCount++;
switch (state) {
case INIT:
case EOR:
@@ -196,12 +230,12 @@
return false;
case IN_RECORD:
- boolean eof;
+ fieldCount++;
// reset quote related values
startedQuote = false;
isDoubleQuoteIncludedInThisField = false;
- lastQuotePosition = -99;
- lastDoubleQuotePosition = -99;
+ lastQuotePosition = -1;
+ lastDoubleQuotePosition = -1;
quoteCount = 0;
doubleQuoteCount = 0;
@@ -209,21 +243,26 @@
while (true) {
if (p >= end) {
int s = start;
- eof = !readMore();
+ boolean eof = !readMore();
p -= (s - start);
- lastQuotePosition -= (s - start);
- lastDoubleQuotePosition -= (s - start);
- lastDelimiterPosition -= (s - start);
+ lastQuotePosition -= (lastQuotePosition > -1) ? (s - start) : 0;
+ lastDoubleQuotePosition -= (lastDoubleQuotePosition > -1) ? (s - start) : 0;
+ lastDelimiterPosition -= (lastDelimiterPosition > -1) ? (s - start) : 0;
if (eof) {
state = State.EOF;
- if (startedQuote && lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
- && quoteCount == doubleQuoteCount * 2 + 2) {
- // set the position of fStart to +1, fEnd to -1 to remove quote character
- fStart = start + 1;
- fEnd = p - 1;
- } else {
+ if (!startedQuote) {
fStart = start;
fEnd = p;
+ } else {
+ if (lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
+ && quoteCount == doubleQuoteCount * 2 + 2) {
+ // set the position of fStart to +1, fEnd to -1 to remove quote character
+ fStart = start + 1;
+ fEnd = p - 1;
+ } else {
+ throw new IOException("At record: " + recordCount + ", field#: " + fieldCount
+ + " - missing a closing quote");
+ }
}
return true;
}
@@ -232,12 +271,12 @@
if (ch == quote) {
// If this is first quote in the field, then it needs to be placed in the beginning.
if (!startedQuote) {
- if (lastDelimiterPosition == p - 1 || lastDelimiterPosition == -99) {
+ if (p == start) {
startedQuote = true;
} else {
// In this case, we don't have a quote in the beginning of a field.
throw new IOException("At record: " + recordCount + ", field#: " + fieldCount
- + " - a quote enclosing a field needs to be placed in the beginning of that field.");
+ + " - a quote enclosing a field needs to be placed in the beginning of that field");
}
}
// Check double quotes - "". We check [start != p-2]
@@ -245,8 +284,8 @@
// since it looks like a double quote. However, it's not a double quote.
// (e.g. if field2 has no value:
// field1,"",field3 ... )
- if (lastQuotePosition == p - 1 && lastDelimiterPosition != p - 2
- && lastDoubleQuotePosition != p - 1) {
+ if (lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
+ && lastQuotePosition != start) {
isDoubleQuoteIncludedInThisField = true;
doubleQuoteCount++;
lastDoubleQuotePosition = p;
@@ -262,64 +301,46 @@
start = p + 1;
lastDelimiterPosition = p;
return true;
- } else if (startedQuote) {
- if (lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1) {
- // There is a quote right before the delimiter (e.g. ",) and it is not two quote,
- // then the field contains a valid string.
- // We set the position of fStart to +1, fEnd to -1 to remove quote character
- fStart = start + 1;
- fEnd = p - 1;
- start = p + 1;
- lastDelimiterPosition = p;
- startedQuote = false;
- return true;
- } else if (lastQuotePosition < p - 1 && lastQuotePosition != lastDoubleQuotePosition
- && quoteCount == doubleQuoteCount * 2 + 2) {
- // There is a quote before the delimiter, however it is not directly placed before the delimiter.
- // In this case, we throw an exception.
- // quoteCount == doubleQuoteCount * 2 + 2 : only true when we have two quotes except double-quotes.
- throw new IOException("At record: " + recordCount + ", field#: " + fieldCount
- + " - A quote enclosing a field needs to be followed by the delimiter.");
- }
+ }
+
+ if (lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
+ && lastQuotePosition != start) {
+ // There is a quote right before the delimiter (e.g. ",) and it is not two quote,
+ // then the field contains a valid string.
+ // We set the position of fStart to +1, fEnd to -1 to remove quote character
+ fStart = start + 1;
+ fEnd = p - 1;
+ start = p + 1;
+ lastDelimiterPosition = p;
+ startedQuote = false;
+ return true;
+ } else if (lastQuotePosition < p - 1 && lastQuotePosition != lastDoubleQuotePosition
+ && quoteCount == doubleQuoteCount * 2 + 2) {
+ // There is a quote before the delimiter, however it is not directly placed before the delimiter.
+ // In this case, we throw an exception.
+ // quoteCount == doubleQuoteCount * 2 + 2 : only true when we have two quotes except double-quotes.
+ throw new IOException("At record: " + recordCount + ", field#: " + fieldCount
+ + " - A quote enclosing a field needs to be followed by the delimiter.");
}
// If the control flow reaches here: we have a delimiter in this field and
// there should be a quote in the beginning and the end of
// this field. So, just continue reading next character
- } else if (ch == '\n') {
+ } else if (ch == '\n' || ch == '\r') {
if (!startedQuote) {
fStart = start;
fEnd = p;
start = p + 1;
- state = State.EOR;
+ state = ch == '\n' ? State.EOR : State.CR;
lastDelimiterPosition = p;
return true;
- } else if (startedQuote && lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
+ } else if (lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
&& quoteCount == doubleQuoteCount * 2 + 2) {
// set the position of fStart to +1, fEnd to -1 to remove quote character
fStart = start + 1;
fEnd = p - 1;
lastDelimiterPosition = p;
start = p + 1;
- state = State.EOR;
- startedQuote = false;
- return true;
- }
- } else if (ch == '\r') {
- if (!startedQuote) {
- fStart = start;
- fEnd = p;
- start = p + 1;
- state = State.CR;
- lastDelimiterPosition = p;
- return true;
- } else if (startedQuote && lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
- && quoteCount == doubleQuoteCount * 2 + 2) {
- // set the position of fStart to +1, fEnd to -1 to remove quote character
- fStart = start + 1;
- fEnd = p - 1;
- lastDelimiterPosition = p;
- start = p + 1;
- state = State.CR;
+ state = ch == '\n' ? State.EOR : State.CR;
startedQuote = false;
return true;
}
@@ -330,7 +351,10 @@
throw new IllegalStateException();
}
- protected boolean readMore() throws IOException {
+ private boolean readMore() throws IOException {
+ if (in == null) {
+ return false;
+ }
if (start > 0) {
System.arraycopy(buffer, start, buffer, 0, end - start);
}
@@ -350,10 +374,11 @@
}
// Eliminate escaped double quotes("") in a field
- public void eliminateDoubleQuote(char[] buffer, int start, int length) {
- int lastDoubleQuotePosition = -99;
- int writepos = start;
- int readpos = start;
+ public void eliminateDoubleQuote() {
+ int lastDoubleQuotePosition = -1;
+ int writepos = fStart;
+ int readpos = fStart;
+ int length = fEnd - fStart;
// Find positions where double quotes appear
for (int i = 0; i < length; i++) {
// Skip double quotes
@@ -369,5 +394,7 @@
readpos++;
}
}
+ fEnd -= doubleQuoteCount;
+ isDoubleQuoteIncludedInThisField = false;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/file/CursorTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/file/CursorTest.java
index e663179..8edcafc 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/file/CursorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/file/CursorTest.java
@@ -51,9 +51,8 @@
while (cursor.nextRecord()) {
int fieldNumber = 0;
while (cursor.nextField()) {
- if (cursor.isDoubleQuoteIncludedInThisField) {
- cursor.eliminateDoubleQuote(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
- cursor.fEnd -= cursor.doubleQuoteCount;
+ if (cursor.fieldHasDoubleQuote()) {
+ cursor.eliminateDoubleQuote();
}
fieldNumber++;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 741ca8c..ee19de3 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -75,14 +75,14 @@
@GuardedBy("ChannelControlBlock")
private boolean computeWritability() {
- boolean writableDataPresent = currentWriteBuffer != null || !wiFullQueue.isEmpty();
+ if (!ecodeSent && ecode.get() == REMOTE_ERROR_CODE) {
+ return true;
+ }
+ boolean writableDataPresent = !ecodeSent && (currentWriteBuffer != null || !wiFullQueue.isEmpty());
if (writableDataPresent) {
return credits > 0;
}
- if (isPendingCloseWrite()) {
- return true;
- }
- return ecode.get() == REMOTE_ERROR_CODE && !ecodeSent;
+ return isPendingCloseWrite();
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
index a7be3a6..e542a34 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
@@ -41,7 +41,15 @@
if (currentWriteBuffer == null) {
currentWriteBuffer = wiFullQueue.poll();
}
- if (currentWriteBuffer != null) {
+ if (!ecodeSent && ecode.get() == REMOTE_ERROR_CODE) {
+ writerState.getCommand().setChannelId(channelId);
+ writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR);
+ writerState.getCommand().setData(ecode.get());
+ writerState.reset(null, 0, null);
+ ecodeSent = true;
+ ccb.reportLocalEOS();
+ adjustChannelWritability();
+ } else if (currentWriteBuffer != null) {
int size = Math.min(currentWriteBuffer.remaining(), credits);
if (size > 0) {
credits -= size;
@@ -55,14 +63,6 @@
} else {
adjustChannelWritability();
}
- } else if (ecode.get() == REMOTE_ERROR_CODE && !ecodeSent) {
- writerState.getCommand().setChannelId(channelId);
- writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR);
- writerState.getCommand().setData(REMOTE_ERROR_CODE);
- writerState.reset(null, 0, null);
- ecodeSent = true;
- ccb.reportLocalEOS();
- adjustChannelWritability();
} else if (isPendingCloseWrite()) {
writerState.getCommand().setChannelId(channelId);
writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 5e922b1..81e528b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -288,13 +288,17 @@
// Unlatch and unpin pages that weren't in the queue to avoid leaking memory.
compressedPageWriter.abort();
for (NodeFrontier nodeFrontier : nodeFrontiers) {
- ICachedPage frontierPage = nodeFrontier.page;
- if (frontierPage.confiscated()) {
- bufferCache.returnPage(frontierPage, false);
+ if (nodeFrontier != null && nodeFrontier.page != null) {
+ ICachedPage frontierPage = nodeFrontier.page;
+ if (frontierPage.confiscated()) {
+ bufferCache.returnPage(frontierPage, false);
+ }
}
}
for (ICachedPage pageToDiscard : pagesToWrite) {
- bufferCache.returnPage(pageToDiscard, false);
+ if (pageToDiscard != null) {
+ bufferCache.returnPage(pageToDiscard, false);
+ }
}
releasedLatches = true;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java
index 9217415..3226786 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java
@@ -83,22 +83,21 @@
public void prepareWrite(ICachedPage cPage) throws HyracksDataException {
final ICachedPageInternal internalPage = (ICachedPageInternal) cPage;
final int entryPageId = getLAFEntryPageId(BufferedFileHandle.getPageId(internalPage.getDiskPageId()));
-
- if (!cachedFrames.containsKey(entryPageId)) {
- try {
- //Writing new page(s). Confiscate the page(s) from the buffer cache.
- prepareFrames(entryPageId, internalPage);
- } catch (HyracksDataException e) {
- abort();
- throw e;
- }
+ try {
+ //Writing new page(s). Confiscate the page(s) from the buffer cache.
+ prepareFrames(entryPageId, internalPage);
+ } catch (HyracksDataException e) {
+ abort();
+ throw e;
}
}
private void prepareFrames(int entryPageId, ICachedPageInternal cPage) throws HyracksDataException {
- //Confiscate the first page
- confiscatePage(entryPageId);
- //check if extra pages spans to the next entry page
+ // check if we need to confiscate a page for the main page
+ if (!cachedFrames.containsKey(entryPageId)) {
+ confiscatePage(entryPageId);
+ }
+ // check if extra pages span to the next entry page
for (int i = 0; i < cPage.getFrameSizeMultiplier() - 1; i++) {
final int extraEntryPageId = getLAFEntryPageId(cPage.getExtraBlockPageId() + i);
if (!cachedFrames.containsKey(extraEntryPageId)) {
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index e44480c..3f78234 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -51,6 +51,7 @@
public class TestUtils {
+ private static final int DEFAULT_FRAME_SIZE = 32768;
public static final IWarningCollector NOOP_WARNING_COLLECTOR = new IWarningCollector() {
@Override
public void warn(Warning warning) {
@@ -68,6 +69,10 @@
}
};
+ public static IHyracksTaskContext createHyracksTask() {
+ return create(DEFAULT_FRAME_SIZE);
+ }
+
public static IHyracksTaskContext create(int frameSize) {
IOManager ioManager = null;
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
index 1fcd806..65567b3 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
@@ -115,6 +115,17 @@
return hosts;
}
+ public static String defaultPort(String maybeHostPort, int defaultPort) {
+ String encodedInput = encodeIPv6LiteralHost(maybeHostPort);
+ int lastColon = encodedInput.lastIndexOf(':');
+ int closeBracket = encodedInput.lastIndexOf(']');
+ if (lastColon > 0 && lastColon > closeBracket) {
+ return maybeHostPort;
+ } else {
+ return encodedInput + ":" + defaultPort;
+ }
+ }
+
public static String encodeIPv6LiteralHost(String hostname) {
return InetAddressUtils.isIPv6Address(hostname) ? "[" + hostname + "]" : hostname;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/test/java/org/apache/hyracks/util/NetworkUtilTest.java b/hyracks-fullstack/hyracks/hyracks-util/src/test/java/org/apache/hyracks/util/NetworkUtilTest.java
new file mode 100644
index 0000000..c5d42c5
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/test/java/org/apache/hyracks/util/NetworkUtilTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NetworkUtilTest {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ @Test
+ public void testDefaultPort() {
+ Assert.assertEquals("127.0.0.1:1234", NetworkUtil.defaultPort("127.0.0.1:1234", 9999));
+ Assert.assertEquals("127.0.0.1:9999", NetworkUtil.defaultPort("127.0.0.1", 9999));
+ Assert.assertEquals("[::1]:1234", NetworkUtil.defaultPort("[::1]:1234", 9999));
+ Assert.assertEquals("[::1]:9999", NetworkUtil.defaultPort("::1", 9999));
+ Assert.assertEquals("localhost.localdomain.local:9999",
+ NetworkUtil.defaultPort("localhost.localdomain.local", 9999));
+ Assert.assertEquals("localhost.localdomain.local:1234",
+ NetworkUtil.defaultPort("localhost.localdomain.local:1234", 9999));
+
+ }
+}
diff --git a/hyracks-fullstack/pom.xml b/hyracks-fullstack/pom.xml
index bb57cd7..f10f2fa 100644
--- a/hyracks-fullstack/pom.xml
+++ b/hyracks-fullstack/pom.xml
@@ -82,7 +82,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
- <version>4.1.46.Final</version>
+ <version>4.1.48.Final</version>
</dependency>
<dependency>
<groupId>junit</groupId>
@@ -277,6 +277,31 @@
<artifactId>maven-plugin-api</artifactId>
<version>3.6.3</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-model</artifactId>
+ <version>3.6.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-artifact</artifactId>
+ <version>3.6.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-compat</artifactId>
+ <version>3.6.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-core</artifactId>
+ <version>3.6.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-plugin-api</artifactId>
+ <version>3.6.3</version>
+ </dependency>
</dependencies>
</dependencyManagement>
<build>