[ASTERIXDB-2713][EXT] CSV & TSV support for external dataset p2
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- validate adapter configuration
- ignore empty lines in CSV/TSV files
- require "header" parameter for CSV/TSV formats
- make some parameters case-insensitive
- few fixes and clean-ups
Change-Id: I2f523de0d482a358ada0c27236ff24616ad0d7da
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5848
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/data/csv/empty.csv b/asterixdb/asterix-app/data/csv/empty.csv
new file mode 100644
index 0000000..3f2ff2d
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/empty.csv
@@ -0,0 +1,5 @@
+
+
+
+
+
diff --git a/asterixdb/asterix-app/data/csv/sample_12.csv b/asterixdb/asterix-app/data/csv/sample_12.csv
index 2ab7c6d..0c9baf5 100644
--- a/asterixdb/asterix-app/data/csv/sample_12.csv
+++ b/asterixdb/asterix-app/data/csv/sample_12.csv
@@ -1,3 +1,4 @@
+f1,f2,f3
1,true,"text"
2,false,"text"
3,true,"text"
diff --git a/asterixdb/asterix-app/data/csv/sample_13.csv b/asterixdb/asterix-app/data/csv/sample_13.csv
new file mode 100644
index 0000000..9f53f56
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/sample_13.csv
@@ -0,0 +1,11 @@
+
+
+f1,f2,f3,f4
+
+1,,"good","recommend"
+
+2,,"bad","not recommend"
+3,,"good",
+
+
+
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 9d427ec..3906bd5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -723,7 +723,9 @@
case EXTERNAL:
ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl();
Map<String, String> properties = createExternalDatasetProperties(dd, metadataProvider, mdTxnCtx);
- ExternalDataUtils.defaultConfiguration(properties);
+ ExternalDataUtils.normalize(properties);
+ ExternalDataUtils.validate(properties);
+ validateExternalDatasetDetails(externalDetails, properties);
datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
TransactionState.COMMIT);
break;
@@ -1970,9 +1972,12 @@
MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName,
dataverseName + "." + datasetName);
try {
+ Map<String, String> properties = loadStmt.getProperties();
+ ExternalDataUtils.normalize(properties);
+ ExternalDataUtils.validate(properties);
CompiledLoadFromFileStatement cls =
new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName().getValue(),
- loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
+ loadStmt.getAdapter(), properties, loadStmt.dataIsAlreadySorted());
cls.setSourceLocation(stmt.getSourceLocation());
JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls,
null, responsePrinter, warningCollector);
@@ -2170,7 +2175,10 @@
"A feed with this name " + feedName + " already exists.");
}
}
- feed = new Feed(dataverseName, feedName, cfs.getConfiguration());
+ Map<String, String> configuration = cfs.getConfiguration();
+ ExternalDataUtils.normalize(configuration);
+ ExternalDataUtils.validate(configuration);
+ feed = new Feed(dataverseName, feedName, configuration);
FeedMetadataUtil.validateFeed(feed, mdTxnCtx, appCtx);
MetadataManager.INSTANCE.addFeed(metadataProvider.getMetadataTxnContext(), feed);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -3177,4 +3185,14 @@
throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, clientRequest.getId());
}
}
+
+ protected void validateExternalDatasetDetails(ExternalDetailsDecl externalDetails, Map<String, String> properties)
+ throws RuntimeDataException {
+ String adapter = externalDetails.getAdapter();
+ // "format" parameter is needed for "S3" data source
+ if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(adapter)
+ && properties.get(ExternalDataConstants.KEY_FORMAT) == null) {
+ throw new RuntimeDataException(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_FORMAT);
+ }
+ }
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp
index 5728e78..f7fe18c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.1.ddl.sqlpp
@@ -25,8 +25,11 @@
CREATE TYPE t1 AS {f1: string, f2: string, f3: string, f4: string, f5: string};
CREATE TYPE t2 AS {f1: string, f2: string, f3: string};
CREATE TYPE t3 AS {f1: int?, f2: boolean, f3: string?};
+CREATE TYPE t4 AS {f1: string, f2: string, f3: string, f4: string};
-CREATE EXTERNAL DATASET ds1(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_09.csv"), ("format"="csv"));
-CREATE EXTERNAL DATASET ds2(t2) USING localfs(("path"="asterix_nc1://data/csv/sample_10.csv"), ("format"="csv"));
-CREATE EXTERNAL DATASET ds3(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_11.csv"), ("format"="csv"));
-CREATE EXTERNAL DATASET ds4(t3) USING localfs(("path"="asterix_nc1://data/csv/sample_12.csv"), ("format"="csv"));
\ No newline at end of file
+CREATE EXTERNAL DATASET ds1(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_09.csv"), ("format"="CSV"), ("header"="FALSE"));
+CREATE EXTERNAL DATASET ds2(t2) USING localfs(("path"="asterix_nc1://data/csv/sample_10.csv"), ("format"="Csv"), ("header"="False"));
+CREATE EXTERNAL DATASET ds3(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_11.csv"), ("format"="csv"), ("header"="FALSE"));
+CREATE EXTERNAL DATASET ds4(t3) USING localfs(("path"="asterix_nc1://data/csv/sample_12.csv"), ("format"="csv"), ("header"="True"));
+CREATE EXTERNAL DATASET ds5(t4) USING localfs(("path"="asterix_nc1://data/csv/sample_13.csv"), ("format"="csv"), ("header"="True"));
+CREATE EXTERNAL DATASET ds6(t4) USING localfs(("path"="asterix_nc1://data/csv/empty.csv"), ("format"="csv"), ("header"="false"));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.query.sqlpp
new file mode 100644
index 0000000..a3d113d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds5 v SELECT VALUE v ORDER BY v.f1;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.7.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.7.query.sqlpp
new file mode 100644
index 0000000..2e5b312
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.7.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM ds6 v SELECT VALUE v ORDER BY v.f1;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.8.ddl.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.6.ddl.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/csv-parser-001/csv-parser-001.8.ddl.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp
index c0faf16..cabe54b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/csv-tsv-parser/tsv-parser-001/tsv-parser-002.1.ddl.sqlpp
@@ -24,4 +24,4 @@
CREATE TYPE t1 AS {f1: int, f2: int, f3: string, f4: boolean, f5: bigint, f6: double};
-CREATE EXTERNAL DATASET ds1(t1) USING localfs(("path"="asterix_nc1://data/tsv/sample_01.tsv"), ("format"="tsv"))
\ No newline at end of file
+CREATE EXTERNAL DATASET ds1(t1) USING localfs(("path"="asterix_nc1://data/tsv/sample_01.tsv"), ("format"="tsv"), ("header"="FALSE"));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp
index b906039..6184b19 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/001/query-dataset.000.ddl.sqlpp
@@ -32,5 +32,6 @@
("serviceEndpoint"="http://localhost:8001"),
("container"="playground"),
("definition"="csv-data/reviews"),
-("format"="csv")
+("format"="Csv"),
+("header"="false")
);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp
index d385bee..194adf6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/002/query-dataset.000.ddl.sqlpp
@@ -32,5 +32,6 @@
("serviceEndpoint"="http://localhost:8001"),
("container"="playground"),
("definition"="tsv-data/reviews"),
-("format"="tsv")
+("format"="TSV"),
+("header"="False")
);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/negative/negative.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/negative/negative.000.ddl.sqlpp
new file mode 100644
index 0000000..e0fc056
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/negative/negative.000.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// "format" parameter is missing for S3
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+DROP TYPE test IF EXISTS;
+CREATE TYPE test AS {id: int, year: int?, review: string, details: string?};
+
+DROP DATASET test IF EXISTS;
+CREATE EXTERNAL DATASET test(test) USING S3 (
+("accessKey"="dummyAccessKey"),
+("secretKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="tsv-data/reviews")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.6.adm
new file mode 100644
index 0000000..9a1d1c9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.6.adm
@@ -0,0 +1,3 @@
+{ "f1": "1", "f2": "", "f3": "good", "f4": "recommend" }
+{ "f1": "2", "f2": "", "f3": "bad", "f4": "not recommend" }
+{ "f1": "3", "f2": "", "f3": "good", "f4": "" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.7.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/csv-tsv-parser/csv-parser-001/csv-parser-001.7.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
index 9948209..2456f13 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
@@ -17,7 +17,7 @@
! specific language governing permissions and limitations
! under the License.
!-->
-<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp">
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
<test-group name="external-dataset">
<test-case FilePath="external-dataset">
<compilation-unit name="aws/s3/000">
@@ -34,5 +34,11 @@
<output-dir compare="Text">aws/s3/002</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="aws/s3/negative">
+ <output-dir compare="Text">aws/s3/negative</output-dir>
+ <expected-error>Parameter(s) format must be specified</expected-error>
+ </compilation-unit>
+ </test-case>
</test-group>
</test-suite>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index da1c4e7..14f7c83 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -241,7 +241,7 @@
public static final int LIBRARY_EXTERNAL_FUNCTION_UNSUPPORTED_NAME = 3047;
public static final int OPERATORS_FEED_META_OPERATOR_DESCRIPTOR_INVALID_RUNTIME = 3048;
public static final int PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_DELIMITER = 3049;
- public static final int PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_QUOTE = 3050;
+ public static final int PARSER_INVALID_CHAR_LENGTH = 3050;
public static final int PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_QUOTE_DELIMITER_MISMATCH = 3051;
public static final int INDEXING_EXTERNAL_FILE_INDEX_ACCESSOR_UNABLE_TO_FIND_FILE_INDEX = 3052;
public static final int PARSER_ADM_DATA_PARSER_FIELD_NOT_NULL = 3053;
@@ -307,6 +307,8 @@
public static final int FAILED_TO_PARSE_METADATA = 3115;
public static final int INPUT_DECODE_FAILURE = 3116;
public static final int FAILED_TO_PARSE_MALFORMED_LOG_RECORD = 3117;
+ public static final int PARAMETERS_REQUIRED = 3118;
+ public static final int MALFORMED_RECORD = 3119;
// Lifecycle management errors
public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 457206d..98ae1a7 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -239,7 +239,7 @@
3047 = External %1$s not supported
3048 = Invalid feed runtime: %1$s
3049 = '%1$s' is not a valid delimiter. The length of a delimiter should be 1
-3050 = '%1$s' is not a valid quote. The length of a quote should be 1
+3050 = '%1$s' is not a valid %2$s. The length of %2$s should be 1
3051 = Quote '%1$s' cannot be used with the delimiter '%2$s'
3052 = Was not able to find a file in the files index
3053 = Field %1$s can not be null
@@ -305,6 +305,8 @@
3115 = Failed to parse record metadata
3116 = Failed to decode input
3117 = Failed to parse record, malformed log record
+3118 = Parameter(s) %1$s must be specified
+3119 = Record number %1$s is malformed
# Lifecycle management errors
4000 = Partition id %1$s for node %2$s already in use by node %3$s
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index f830376..7702dde 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -205,7 +205,7 @@
IExternalIndexer indexer = files == null ? null : ExternalIndexerProvider.getIndexer(configuration);
if (recordReaderClazz != null) {
StreamRecordReader streamReader = (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
- streamReader.configure(createInputStream(ctx, partition, indexer), configuration);
+ streamReader.configure(ctx, createInputStream(ctx, partition, indexer), configuration);
if (indexer != null) {
return new IndexingStreamRecordReader(streamReader, indexer);
} else {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
index 65ecd8d..aa4abb4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
@@ -134,4 +134,10 @@
strValue.getChars(0, strValue.length(), value, 0);
this.size = strValue.length();
}
+
+ public boolean isEmptyRecord() {
+ return size <= 0
+ || (size == 1 && (value[0] == ExternalDataConstants.LF || value[0] == ExternalDataConstants.CR))
+ || (size == 2 && value[0] == ExternalDataConstants.CR && value[1] == ExternalDataConstants.LF);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
index e78783a..6484d4e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
@@ -18,20 +18,16 @@
*/
package org.apache.asterix.external.input.record.reader.aws;
-import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
import org.apache.asterix.external.provider.StreamRecordReaderProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class AwsS3ReaderFactory extends StreamRecordReaderFactory {
@@ -73,18 +69,4 @@
// record reader
recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
}
-
- @Override
- public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
- throws HyracksDataException {
- try {
- StreamRecordReader streamRecordReader =
- (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
- streamRecordReader.configure(streamFactory.createInputStream(ctx, partition), configuration);
- return streamRecordReader;
- } catch (InstantiationException | IllegalAccessException | InvocationTargetException
- | NoSuchMethodException e) {
- throw HyracksDataException.create(e);
- }
- }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
index 07b6250..24a68a7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/EmptyLineSeparatedRecordReader.java
@@ -26,6 +26,7 @@
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
public class EmptyLineSeparatedRecordReader extends StreamRecordReader {
@@ -135,7 +136,7 @@
}
@Override
- public void configure(AsterixInputStream inputStream, Map<String, String> config) {
+ public void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config) {
super.configure(inputStream);
this.config = config;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index be600ed..a27397e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -27,6 +27,7 @@
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class LineRecordReader extends StreamRecordReader {
@@ -42,10 +43,12 @@
private static final String REQUIRED_CONFIGS = "";
@Override
- public void configure(AsterixInputStream inputStream, Map<String, String> config) throws HyracksDataException {
+ public void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config)
+ throws HyracksDataException {
super.configure(inputStream);
this.hasHeader = ExternalDataUtils.hasHeader(config);
if (hasHeader) {
+ // TODO(ali): revisit this and notifyNewSource
inputStream.setNotificationHandler(this);
}
}
@@ -100,13 +103,16 @@
startPosn = bufferPosn = 0;
bufferLength = reader.read(inputBuffer);
if (bufferLength <= 0) {
- if (readLength > 0) {
- record.endRecord();
- recordNumber++;
- return true;
+ if (readLength <= 0) {
+ close();
+ return false; //EOF
}
- close();
- return false; //EOF
+ record.endRecord();
+ if (record.isEmptyRecord()) {
+ return false;
+ }
+ recordNumber++;
+ return true;
}
}
for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
@@ -130,6 +136,9 @@
record.append(inputBuffer, startPosn, readLength);
}
} while (newlineLength == 0);
+ if (record.isEmptyRecord()) {
+ continue;
+ }
if (nextIsHeader) {
nextIsHeader = false;
continue;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index 1fd328b..564df4b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -24,39 +24,35 @@
import java.util.List;
import java.util.Map;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.WarningUtil;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
public class QuotedLineRecordReader extends LineRecordReader {
private char quote;
private char quoteEscape;
+ private IWarningCollector warningCollector;
+ private final SourceLocation srcLoc = new SourceLocation(-1, -1);
private static final List<String> recordReaderFormats = Collections.unmodifiableList(
Arrays.asList(ExternalDataConstants.FORMAT_DELIMITED_TEXT, ExternalDataConstants.FORMAT_CSV));
private static final String REQUIRED_CONFIGS = ExternalDataConstants.KEY_QUOTE;
@Override
- public void configure(AsterixInputStream inputStream, Map<String, String> config) throws HyracksDataException {
- super.configure(inputStream, config);
+ public void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config)
+ throws HyracksDataException {
+ super.configure(ctx, inputStream, config);
+ this.warningCollector = ctx.getWarningCollector();
String quoteString = config.get(ExternalDataConstants.KEY_QUOTE);
- if (quoteString.length() != 1) {
- throw new HyracksDataException(ExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_QUOTE,
- ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
- }
+ ExternalDataUtils.validateQuote(quoteString);
this.quote = quoteString.charAt(0);
- String escapeString = config.get(ExternalDataConstants.KEY_QUOTE_ESCAPE);
- if (escapeString == null) {
- quoteEscape = ExternalDataConstants.ESCAPE;
- } else {
- if (escapeString.length() != 1) {
- throw new HyracksDataException(
- ExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_QUOTE_ESCAPE,
- ExternalDataConstants.PARAMETER_OF_SIZE_ONE, escapeString));
- }
- quoteEscape = escapeString.charAt(0);
- }
+ this.quoteEscape = ExternalDataUtils.validateGetQuoteEscape(config);
}
@Override
@@ -87,16 +83,22 @@
startPosn = bufferPosn = 0;
bufferLength = reader.read(inputBuffer);
if (bufferLength <= 0) {
- if (readLength > 0) {
- if (inQuote) {
- throw new IOException("malformed input record ended inside quote");
+ // reached end of stream
+ if (readLength <= 0 || inQuote) {
+ // haven't read anything previously OR have read and in the middle and hit the end
+ if (inQuote && warningCollector.shouldWarn()) {
+ warningCollector
+ .warn(WarningUtil.forAsterix(srcLoc, ErrorCode.MALFORMED_RECORD, recordNumber));
}
- record.endRecord();
- recordNumber++;
- return true;
+ close();
+ return false;
}
- close();
- return false;
+ record.endRecord();
+ if (record.isEmptyRecord()) {
+ return false;
+ }
+ recordNumber++;
+ return true;
}
}
boolean maybeInQuote = false;
@@ -121,12 +123,9 @@
// this is an opening quote
inQuote = true;
}
- if (prevCharEscape) {
- prevCharEscape = false;
- } else {
- // the quoteEscape != quote is for making an opening quote not an escape
- prevCharEscape = inputBuffer[bufferPosn] == quoteEscape && quoteEscape != quote;
- }
+ // the quoteEscape != quote is for making an opening quote not an escape
+ prevCharEscape =
+ inputBuffer[bufferPosn] == quoteEscape && !prevCharEscape && quoteEscape != quote;
} else {
// if quote == quoteEscape and current char is quote, then it could be closing or escaping
if (inputBuffer[bufferPosn] == quote && !prevCharEscape) {
@@ -146,6 +145,9 @@
record.append(inputBuffer, startPosn, readLength);
}
} while (newlineLength == 0);
+ if (record.isEmptyRecord()) {
+ continue;
+ }
if (nextIsHeader) {
nextIsHeader = false;
continue;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 883f0ee..5ab5730 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -29,6 +29,7 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class SemiStructuredRecordReader extends StreamRecordReader {
@@ -45,7 +46,8 @@
private static final String REQUIRED_CONFIGS = "";
@Override
- public void configure(AsterixInputStream stream, Map<String, String> config) throws HyracksDataException {
+ public void configure(IHyracksTaskContext ctx, AsterixInputStream stream, Map<String, String> config)
+ throws HyracksDataException {
super.configure(stream);
String recStartString = config.get(ExternalDataConstants.KEY_RECORD_START);
String recEndString = config.get(ExternalDataConstants.KEY_RECORD_END);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
index 5629f48..4aed741 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
@@ -31,6 +31,7 @@
import org.apache.asterix.external.input.stream.AsterixInputStreamReader;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public abstract class StreamRecordReader implements IRecordReader<char[]>, IStreamNotificationHandler {
@@ -100,6 +101,6 @@
public abstract String getRequiredConfigs();
- public abstract void configure(AsterixInputStream inputStream, Map<String, String> config)
+ public abstract void configure(IHyracksTaskContext ctx, AsterixInputStream inputStream, Map<String, String> config)
throws HyracksDataException;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index 076842e..9bb50f6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -95,7 +95,7 @@
try {
StreamRecordReader streamRecordReader =
(StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
- streamRecordReader.configure(streamFactory.createInputStream(ctx, partition), configuration);
+ streamRecordReader.configure(ctx, streamFactory.createInputStream(ctx, partition), configuration);
return streamRecordReader;
} catch (InstantiationException | IllegalAccessException | InvocationTargetException
| NoSuchMethodException e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
index 8facce6..505acbd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -35,6 +35,7 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
@@ -43,22 +44,23 @@
public class DelimitedDataParser extends AbstractDataParser implements IStreamDataParser, IRecordDataParser<char[]> {
+ private final IHyracksTaskContext ctx;
private final char fieldDelimiter;
private final char quote;
private final boolean hasHeader;
- private ARecordType recordType;
- private IARecordBuilder recBuilder;
- private ArrayBackedValueStorage fieldValueBuffer;
- private DataOutput fieldValueBufferOutput;
- private IValueParser[] valueParsers;
+ private final ARecordType recordType;
+ private final IARecordBuilder recBuilder;
+ private final ArrayBackedValueStorage fieldValueBuffer;
+ private final DataOutput fieldValueBufferOutput;
+ private final IValueParser[] valueParsers;
private FieldCursorForDelimitedDataParser cursor;
- private byte[] fieldTypeTags;
- private int[] fldIds;
- private ArrayBackedValueStorage[] nameBuffers;
- private boolean areAllNullFields;
+ private final byte[] fieldTypeTags;
+ private final int[] fldIds;
+ private final ArrayBackedValueStorage[] nameBuffers;
- public DelimitedDataParser(IValueParserFactory[] valueParserFactories, char fieldDelimiter, char quote,
- boolean hasHeader, ARecordType recordType, boolean isStreamParser) throws HyracksDataException {
+ public DelimitedDataParser(IHyracksTaskContext ctx, IValueParserFactory[] valueParserFactories, char fieldDelimiter,
+ char quote, boolean hasHeader, ARecordType recordType, boolean isStreamParser) throws HyracksDataException {
+ this.ctx = ctx;
this.fieldDelimiter = fieldDelimiter;
this.quote = quote;
this.hasHeader = hasHeader;
@@ -107,10 +109,8 @@
try {
while (cursor.nextRecord()) {
parseRecord();
- if (!areAllNullFields) {
- recBuilder.write(out, true);
- return true;
- }
+ recBuilder.write(out, true);
+ return true;
}
return false;
} catch (IOException e) {
@@ -121,7 +121,6 @@
private void parseRecord() throws HyracksDataException {
recBuilder.reset(recordType);
recBuilder.init();
- areAllNullFields = true;
for (int i = 0; i < valueParsers.length; ++i) {
try {
@@ -152,7 +151,6 @@
}
valueParsers[i].parse(cursor.getBuffer(), cursor.getFieldStart(), cursor.getFieldLength(),
fieldValueBufferOutput);
- areAllNullFields = false;
}
if (fldIds[i] < 0) {
recBuilder.addField(nameBuffers[i], fieldValueBuffer);
@@ -172,9 +170,7 @@
public void parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
cursor.nextRecord(record.get(), record.size());
parseRecord();
- if (!areAllNullFields) {
- recBuilder.write(out, true);
- }
+ recBuilder.write(out, true);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
index 1fee49f..46c5152 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DelimitedDataParserFactory.java
@@ -42,15 +42,15 @@
@Override
public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException {
- return createParser();
+ return createParser(ctx);
}
- private DelimitedDataParser createParser() throws HyracksDataException {
+ private DelimitedDataParser createParser(IHyracksTaskContext ctx) throws HyracksDataException {
IValueParserFactory[] valueParserFactories = ExternalDataUtils.getValueParserFactories(recordType);
- char delimiter = ExternalDataUtils.getDelimiter(configuration);
- char quote = ExternalDataUtils.getQuote(configuration, delimiter);
+ char delimiter = ExternalDataUtils.validateGetDelimiter(configuration);
+ char quote = ExternalDataUtils.validateGetQuote(configuration, delimiter);
boolean hasHeader = ExternalDataUtils.hasHeader(configuration);
- return new DelimitedDataParser(valueParserFactories, delimiter, quote, hasHeader, recordType,
+ return new DelimitedDataParser(ctx, valueParserFactories, delimiter, quote, hasHeader, recordType,
ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.STREAM));
}
@@ -62,7 +62,7 @@
@Override
public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
- return createParser();
+ return createParser(ctx);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
index d34b5e0..704ea29 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
@@ -55,7 +55,7 @@
String recordFormat = configuration.get(ExternalDataConstants.KEY_RECORD_FORMAT);
if (recordFormat == null) {
throw AlgebricksException.create(ErrorCode.UNKNOWN_RECORD_FORMAT_FOR_META_PARSER,
- ExternalDataConstants.KEY_FORMAT);
+ ExternalDataConstants.KEY_RECORD_FORMAT);
}
String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
if (format == null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index 53cf6b1..2265a25 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -46,15 +46,12 @@
public static IDataParserFactory getDataParserFactory(ILibraryManager libraryManager,
Map<String, String> configuration) throws AsterixException {
IDataParserFactory parserFactory;
- String parserFactoryName = configuration.get(ExternalDataConstants.KEY_DATA_PARSER);
+ String parserFactoryName = configuration.get(ExternalDataConstants.KEY_PARSER);
if (ExternalDataUtils.isExternal(parserFactoryName)) {
return ExternalDataUtils.createExternalParserFactory(libraryManager,
ExternalDataUtils.getDataverse(configuration), parserFactoryName);
} else {
- String parserFactoryKey = ExternalDataUtils.getRecordFormat(configuration);
- if (parserFactoryKey == null) {
- parserFactoryKey = configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
- }
+ String parserFactoryKey = ExternalDataUtils.getParserFactory(configuration);
parserFactory = ParserFactoryProvider.getDataParserFactory(parserFactoryKey);
}
return parserFactory;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 1378207..a05ad77 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -18,6 +18,10 @@
*/
package org.apache.asterix.external.util;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
public class ExternalDataConstants {
private ExternalDataConstants() {
@@ -168,6 +172,26 @@
public static final String FORMAT_LINE_SEPARATED = "line-separated";
public static final String FORMAT_HDFS_WRITABLE = "hdfs-writable";
public static final String FORMAT_KV = "kv";
+ public static final String FORMAT_CSV = "csv";
+ public static final String FORMAT_TSV = "tsv";
+ public static final Set<String> ALL_FORMATS;
+ static {
+ Set<String> formats = new HashSet<>(13);
+ formats.add(FORMAT_HIVE);
+ formats.add(FORMAT_BINARY);
+ formats.add(FORMAT_ADM);
+ formats.add(FORMAT_JSON_LOWER_CASE);
+ formats.add(FORMAT_DELIMITED_TEXT);
+ formats.add(FORMAT_TWEET);
+ formats.add(FORMAT_RSS);
+ formats.add(FORMAT_SEMISTRUCTURED);
+ formats.add(FORMAT_LINE_SEPARATED);
+ formats.add(FORMAT_HDFS_WRITABLE);
+ formats.add(FORMAT_KV);
+ formats.add(FORMAT_CSV);
+ formats.add(FORMAT_TSV);
+ ALL_FORMATS = Collections.unmodifiableSet(formats);
+ }
/**
* input streams
@@ -234,8 +258,6 @@
public static final String EXTERNAL = "external";
public static final String KEY_READER_FACTORY = "reader-factory";
public static final String READER_RSS = "rss_feed";
- public static final String FORMAT_CSV = "csv";
- public static final String FORMAT_TSV = "tsv";
public static final String ERROR_PARSE_RECORD = "Parser failed to parse record";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 443aa7e..b7b441b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -44,41 +44,49 @@
public class ExternalDataUtils {
+ private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
+ static {
+ valueParserFactoryMap.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE);
+ valueParserFactoryMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
+ valueParserFactoryMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
+ valueParserFactoryMap.put(ATypeTag.BIGINT, LongParserFactory.INSTANCE);
+ valueParserFactoryMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
+ valueParserFactoryMap.put(ATypeTag.BOOLEAN, BooleanParserFactory.INSTANCE);
+ }
+
private ExternalDataUtils() {
}
// Get a delimiter from the given configuration
- public static char getDelimiter(Map<String, String> configuration) throws HyracksDataException {
+ public static char validateGetDelimiter(Map<String, String> configuration) throws HyracksDataException {
String delimiterValue = configuration.get(ExternalDataConstants.KEY_DELIMITER);
if (delimiterValue == null) {
- delimiterValue = ExternalDataConstants.DEFAULT_DELIMITER;
- } else if (delimiterValue.length() != 1) {
- throw new RuntimeDataException(ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_DELIMITER,
- delimiterValue);
+ return ExternalDataConstants.DEFAULT_DELIMITER.charAt(0);
}
+ validateDelimiter(delimiterValue);
return delimiterValue.charAt(0);
}
// Get a quote from the given configuration when the delimiter is given
// Need to pass delimiter to check whether they share the same character
- public static char getQuote(Map<String, String> configuration, char delimiter) throws HyracksDataException {
+ public static char validateGetQuote(Map<String, String> configuration, char delimiter) throws HyracksDataException {
String quoteValue = configuration.get(ExternalDataConstants.KEY_QUOTE);
if (quoteValue == null) {
- quoteValue = ExternalDataConstants.DEFAULT_QUOTE;
- } else if (quoteValue.length() != 1) {
- throw new RuntimeDataException(ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_QUOTE,
- quoteValue);
+ return ExternalDataConstants.DEFAULT_QUOTE.charAt(0);
}
+ validateQuote(quoteValue);
+ char quote = quoteValue.charAt(0);
+ validateDelimiterAndQuote(delimiter, quote);
+ return quote;
+ }
- // Since delimiter (char type value) can't be null,
- // we only check whether delimiter and quote use the same character
- if (quoteValue.charAt(0) == delimiter) {
- throw new RuntimeDataException(
- ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_QUOTE_DELIMITER_MISMATCH, quoteValue,
- delimiter);
+ public static char validateGetQuoteEscape(Map<String, String> configuration) throws HyracksDataException {
+ String quoteEscapeValue = configuration.get(ExternalDataConstants.KEY_QUOTE_ESCAPE);
+ if (quoteEscapeValue == null) {
+ return ExternalDataConstants.ESCAPE;
}
-
- return quoteValue.charAt(0);
+ validateQuoteEscape(quoteEscapeValue);
+ return quoteEscapeValue.charAt(0);
}
public static void validateDataParserParameters(Map<String, String> configuration) throws AsterixException {
@@ -86,8 +94,8 @@
if (parser == null) {
String parserFactory = configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
if (parserFactory == null) {
- throw new AsterixException("The parameter " + ExternalDataConstants.KEY_FORMAT + " or "
- + ExternalDataConstants.KEY_PARSER_FACTORY + " must be specified.");
+ throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED,
+ ExternalDataConstants.KEY_FORMAT + " or " + ExternalDataConstants.KEY_PARSER_FACTORY);
}
}
}
@@ -95,7 +103,7 @@
public static void validateDataSourceParameters(Map<String, String> configuration) throws AsterixException {
String reader = configuration.get(ExternalDataConstants.KEY_READER);
if (reader == null) {
- throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER + " must be specified.");
+ throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_READER);
}
}
@@ -141,22 +149,13 @@
return configuration.get(ExternalDataConstants.KEY_DATAVERSE);
}
- public static String getRecordFormat(Map<String, String> configuration) {
- String parserFormat = configuration.get(ExternalDataConstants.KEY_DATA_PARSER);
- return parserFormat != null ? parserFormat : configuration.get(ExternalDataConstants.KEY_FORMAT);
- }
-
- private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
-
- private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
- Map<ATypeTag, IValueParserFactory> m = new EnumMap<>(ATypeTag.class);
- m.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE);
- m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
- m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
- m.put(ATypeTag.BIGINT, LongParserFactory.INSTANCE);
- m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
- m.put(ATypeTag.BOOLEAN, BooleanParserFactory.INSTANCE);
- return m;
+ public static String getParserFactory(Map<String, String> configuration) {
+ String parserFactory = configuration.get(ExternalDataConstants.KEY_PARSER);
+ if (parserFactory != null) {
+ return parserFactory;
+ }
+ parserFactory = configuration.get(ExternalDataConstants.KEY_FORMAT);
+ return parserFactory != null ? parserFactory : configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
}
public static IValueParserFactory[] getValueParserFactories(ARecordType recordType) {
@@ -284,8 +283,7 @@
public static int getNumberOfKeys(Map<String, String> configuration) throws AsterixException {
String keyIndexes = configuration.get(ExternalDataConstants.KEY_KEY_INDEXES);
if (keyIndexes == null) {
- throw new AsterixException(
- "A change feed must have the parameter " + ExternalDataConstants.KEY_KEY_INDEXES);
+ throw AsterixException.create(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_KEY_INDEXES);
}
return keyIndexes.split(",").length;
}
@@ -340,7 +338,7 @@
}
/**
- * Prepares the configuration of the external dataset and its adapter by filling the information required by
+ * Prepares the configuration of the external data and its adapter by filling the information required by
* adapters and parsers.
*
* @param adapterName adapter name
@@ -355,4 +353,82 @@
configuration.put(ExternalDataConstants.KEY_PARSER, configuration.get(ExternalDataConstants.KEY_FORMAT));
}
}
+
+ /**
+ * Normalizes the values of certain parameters of the adapter configuration. This should happen before persisting
+ * the metadata (e.g. when creating external datasets or feeds) and when creating an adapter factory.
+ *
+ * @param configuration external data configuration
+ */
+ public static void normalize(Map<String, String> configuration) {
+ // normalize the "format" parameter
+ String paramValue = configuration.get(ExternalDataConstants.KEY_FORMAT);
+ if (paramValue != null) {
+ String lowerCaseFormat = paramValue.toLowerCase().trim();
+ if (ExternalDataConstants.ALL_FORMATS.contains(lowerCaseFormat)) {
+ configuration.put(ExternalDataConstants.KEY_FORMAT, lowerCaseFormat);
+ }
+ }
+ // normalize the "header" parameter
+ paramValue = configuration.get(ExternalDataConstants.KEY_HEADER);
+ if (paramValue != null) {
+ configuration.put(ExternalDataConstants.KEY_HEADER, paramValue.toLowerCase().trim());
+ }
+ }
+
+ /**
+ * Validates the parameter values of the adapter configuration. This should happen after normalizing the values.
+ *
+ * @param configuration external data configuration
+ * @throws HyracksDataException HyracksDataException
+ */
+ public static void validate(Map<String, String> configuration) throws HyracksDataException {
+ String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
+ String header = configuration.get(ExternalDataConstants.KEY_HEADER);
+ if (format != null && isHeaderRequiredFor(format) && header == null) {
+ throw new RuntimeDataException(ErrorCode.PARAMETERS_REQUIRED, ExternalDataConstants.KEY_HEADER);
+ }
+ if (header != null && !isBoolean(header)) {
+ throw new RuntimeDataException(ErrorCode.INVALID_REQ_PARAM_VAL, ExternalDataConstants.KEY_HEADER, header);
+ }
+ char delimiter = validateGetDelimiter(configuration);
+ validateGetQuote(configuration, delimiter);
+ validateGetQuoteEscape(configuration);
+ }
+
+ private static boolean isHeaderRequiredFor(String format) {
+ return format.equals(ExternalDataConstants.FORMAT_CSV) || format.equals(ExternalDataConstants.FORMAT_TSV);
+ }
+
+ private static boolean isBoolean(String value) {
+ return value.equals(ExternalDataConstants.TRUE) || value.equals(ExternalDataConstants.FALSE);
+ }
+
+ private static void validateDelimiter(String delimiter) throws RuntimeDataException {
+ if (delimiter.length() != 1) {
+ throw new RuntimeDataException(ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_NOT_VALID_DELIMITER,
+ delimiter);
+ }
+ }
+
+ public static void validateQuote(String quote) throws RuntimeDataException {
+ if (quote.length() != 1) {
+ throw new RuntimeDataException(ErrorCode.PARSER_INVALID_CHAR_LENGTH, quote,
+ ExternalDataConstants.KEY_QUOTE);
+ }
+ }
+
+ private static void validateQuoteEscape(String quoteEsc) throws RuntimeDataException {
+ if (quoteEsc.length() != 1) {
+ throw new RuntimeDataException(ErrorCode.PARSER_INVALID_CHAR_LENGTH, quoteEsc,
+ ExternalDataConstants.KEY_QUOTE_ESCAPE);
+ }
+ }
+
+ private static void validateDelimiterAndQuote(char delimiter, char quote) throws RuntimeDataException {
+ if (quote == delimiter) {
+ throw new RuntimeDataException(
+ ErrorCode.PARSER_FACTORY_DELIMITED_DATA_PARSER_FACTORY_QUOTE_DELIMITER_MISMATCH, quote, delimiter);
+ }
+ }
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
index 47f784a..8ef6273 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
@@ -48,9 +48,11 @@
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.algebricks.data.IPrinter;
import org.apache.hyracks.algebricks.data.IPrinterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.test.support.TestUtils;
import org.junit.Assert;
import junit.framework.Test;
@@ -58,6 +60,9 @@
import junit.framework.TestSuite;
public class ClassAdToADMTest extends TestCase {
+
+ private final IHyracksTaskContext ctx = TestUtils.createHyracksTask();
+
/**
* Create the test case
*
@@ -123,7 +128,7 @@
FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
LocalFSInputStream in = new LocalFSInputStream(watcher);
SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader();
- recordReader.configure(in, config);
+ recordReader.configure(ctx, in, config);
while (recordReader.hasNext()) {
tb.reset();
IRawRecord<char[]> record = recordReader.next();
@@ -162,7 +167,7 @@
FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
LocalFSInputStream in = new LocalFSInputStream(watcher);
SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader();
- recordReader.configure(in, config);
+ recordReader.configure(ctx, in, config);
try {
Value val = new Value(objectPool);
while (recordReader.hasNext()) {
@@ -204,7 +209,7 @@
FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
LocalFSInputStream in = new LocalFSInputStream(watcher);
SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader();
- recordReader.configure(in, config);
+ recordReader.configure(ctx, in, config);
try {
Value val = new Value(objectPool);
while (recordReader.hasNext()) {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
index 2ba5a3e..2972542 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/ByteBufUTF8DecodeTest.java
@@ -41,6 +41,8 @@
import org.apache.asterix.external.input.stream.LocalFSInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FileSystemWatcher;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.test.support.TestUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -56,6 +58,7 @@
private final CharBuffer chars = CharBuffer.allocate(BUFFER_SIZE);
private final CharArrayRecord value = new CharArrayRecord();
private final ByteBuf nettyBuffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer(KB32, Integer.MAX_VALUE);
+ private final IHyracksTaskContext ctx = TestUtils.createHyracksTask();
@Test
public void eatGlass() {
@@ -83,7 +86,7 @@
FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
LocalFSInputStream in = new LocalFSInputStream(watcher);
try (SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader()) {
- recordReader.configure(in, config);
+ recordReader.configure(ctx, in, config);
while (recordReader.hasNext()) {
try {
IRawRecord<char[]> record = recordReader.next();
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
index 1584065..47c6ffe 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
@@ -44,12 +44,15 @@
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.algebricks.data.IPrinter;
import org.apache.hyracks.algebricks.data.IPrinterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.test.support.TestUtils;
import org.junit.Assert;
public class RecordWithMetaTest {
+ private final IHyracksTaskContext ctx = TestUtils.createHyracksTask();
private static ARecordType recordType;
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -90,7 +93,7 @@
config.put(ExternalDataConstants.KEY_HEADER, "true");
config.put(ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.DEFAULT_QUOTE);
LineRecordReader lineReader = new LineRecordReader();
- lineReader.configure(inputStream, config);
+ lineReader.configure(ctx, inputStream, config);
// create csv with json record reader
CSVToRecordWithMetadataAndPKConverter recordConverter = new CSVToRecordWithMetadataAndPKConverter(
valueIndex, delimiter, metaType, recordType, pkIndicators, pkIndexes, keyTypes);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 7ed53e4..071fd1c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -135,13 +135,13 @@
default:
throw new AsterixException("Unknown Adapter type " + adapterType);
}
- adapterFactory.setOutputType(adapterOutputType);
- adapterFactory.setMetaType(metaType);
- adapterFactory.configure(appCtx.getServiceContext(), configuration);
} else {
- AdapterFactoryProvider.getAdapterFactory(appCtx.getServiceContext(), adapterName, configuration,
- adapterOutputType, metaType);
+ ExternalDataUtils.prepare(adapterName, configuration);
+ adapterFactory = (ITypedAdapterFactory) appCtx.getAdapterFactoryService().createAdapterFactory();
}
+ adapterFactory.setOutputType(adapterOutputType);
+ adapterFactory.setMetaType(metaType);
+ adapterFactory.configure(appCtx.getServiceContext(), configuration);
if (metaType == null && configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME)) {
metaType = getOutputType(feed, configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME));
if (metaType == null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index e44480c..3f78234 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -51,6 +51,7 @@
public class TestUtils {
+ private static final int DEFAULT_FRAME_SIZE = 32768;
public static final IWarningCollector NOOP_WARNING_COLLECTOR = new IWarningCollector() {
@Override
public void warn(Warning warning) {
@@ -68,6 +69,10 @@
}
};
+ public static IHyracksTaskContext createHyracksTask() {
+ return create(DEFAULT_FRAME_SIZE);
+ }
+
public static IHyracksTaskContext create(int frameSize) {
IOManager ioManager = null;
try {