[ASTERIXDB-2726][EXT] Report line number instead of record number in messages of parsers
- user model changes: no
- storage format changes: no
- interface changes: yes
IRecordReader:
added getLineNumber() to provide line number for parsers and converters.
IRecordConverter:
added configure() to pass the line number supplier to the record converter.
IRecordDataParser:
pass line number supplier from the Reader to the Parser.
Details:
Report line number instead of record number in messages of parsers.
- added getPreviousStreamName() to allow readers to report errors happening on
the previous stream when the underlying stream has already switched to a new one.
- changed the test executor to compare actual warnings issues by a test case with
the expected warnigns properly.
Change-Id: I00508d8eeca4d9bae95f55ab51ecfb0ce2ced6b0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6245
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-app/data/csv/error1_line_num.csv b/asterixdb/asterix-app/data/csv/error1_line_num.csv
new file mode 100644
index 0000000..34bcee9
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/error1_line_num.csv
@@ -0,0 +1,3 @@
+1,"good","recommend"
+
+2,"bad" ,"not recommend"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/csv/error2_line_num.csv b/asterixdb/asterix-app/data/csv/error2_line_num.csv
new file mode 100644
index 0000000..0f1286f
--- /dev/null
+++ b/asterixdb/asterix-app/data/csv/error2_line_num.csv
@@ -0,0 +1,5 @@
+1,"good","recommend"
+2,"bad and
+not so good and
+bad" ,"not recommend"
+3,"good","recommend"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/json/malformed-json-no-closing.json b/asterixdb/asterix-app/data/json/malformed-json-no-closing.json
new file mode 100644
index 0000000..83f3087
--- /dev/null
+++ b/asterixdb/asterix-app/data/json/malformed-json-no-closing.json
@@ -0,0 +1,2 @@
+{ "field1": 1, "field2": "text"
+
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
index 36a06c0..d4f92fb 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -23,6 +23,7 @@
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
@@ -129,8 +130,7 @@
}
@Override
- protected void ensureWarnings(int actualWarnCount, int expectedWarnCount, TestCase.CompilationUnit cUnit)
- throws Exception {
+ protected void ensureWarnings(BitSet expectedWarnings, TestCase.CompilationUnit cUnit) throws Exception {
// skip checking warnings as currently cancelled queries with warnings might not run successfully at all
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IPollTask.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IPollTask.java
index ab90244..a1ed12b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IPollTask.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/IPollTask.java
@@ -19,6 +19,7 @@
package org.apache.asterix.test.common;
import java.io.File;
+import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -43,11 +44,11 @@
* @param expectedResultFileCtxs
* @param testFile
* @param actualPath
- * @param actualWarnCount
+ * @param expectedWarnings
*/
void execute(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx, String statement,
boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount,
- List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath, MutableInt actualWarnCount)
+ List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath, BitSet expectedWarnings)
throws Exception;
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index a10dc54..10869c1 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -45,6 +45,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -53,6 +54,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.OptionalInt;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -65,6 +67,7 @@
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.asterix.api.http.server.QueryServiceRequestParameters;
@@ -159,9 +162,9 @@
private static final ContentType TEXT_PLAIN_UTF8 = ContentType.create(HttpUtil.ContentType.APPLICATION_JSON, UTF_8);
private final IPollTask plainExecutor = (testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit,
- queryCount, expectedResultFileCtxs, testFile, actualPath, actualWarnCount) -> executeTestFile(testCaseCtx,
+ queryCount, expectedResultFileCtxs, testFile, actualPath, expectedWarnings) -> executeTestFile(testCaseCtx,
ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount, expectedResultFileCtxs,
- testFile, actualPath, actualWarnCount);
+ testFile, actualPath, expectedWarnings);
public static final String DELIVERY_ASYNC = "async";
public static final String DELIVERY_DEFERRED = "deferred";
@@ -947,7 +950,7 @@
public void executeTestFile(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit,
MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath,
- MutableInt actualWarnCount) throws Exception {
+ BitSet expectedWarnings) throws Exception {
InputStream resultStream;
File qbcFile;
boolean failed = false;
@@ -975,11 +978,11 @@
case "pollquery":
poll(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
expectedResultFileCtxs, testFile, actualPath, ctx.getType().substring("poll".length()),
- actualWarnCount, plainExecutor);
+ expectedWarnings, plainExecutor);
break;
case "polldynamic":
polldynamic(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
- expectedResultFileCtxs, testFile, actualPath, actualWarnCount);
+ expectedResultFileCtxs, testFile, actualPath, expectedWarnings);
break;
case "query":
case "async":
@@ -1006,7 +1009,7 @@
if (testCaseCtx.getTestCase().isCheckWarnings()) {
boolean expectedSourceLoc = testCaseCtx.isSourceLocationExpected(cUnit);
- validateWarnings(extractedResult.getWarnings(), cUnit.getExpectedWarn(), actualWarnCount,
+ validateWarnings(extractedResult.getWarnings(), cUnit.getExpectedWarn(), expectedWarnings,
expectedSourceLoc);
}
break;
@@ -1413,17 +1416,17 @@
private void polldynamic(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit,
MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath,
- MutableInt actualWarnCount) throws Exception {
+ BitSet expectedWarnings) throws Exception {
IExpectedResultPoller poller = getExpectedResultPoller(statement);
final String key = getKey(statement);
poll(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount, expectedResultFileCtxs,
- testFile, actualPath, "validate", actualWarnCount, new IPollTask() {
+ testFile, actualPath, "validate", expectedWarnings, new IPollTask() {
@Override
public void execute(TestCaseContext testCaseCtx, TestFileContext ctx,
Map<String, Object> variableCtx, String statement, boolean isDmlRecoveryTest,
ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount,
List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath,
- MutableInt actualWarnCount) throws Exception {
+ BitSet expectedWarnings) throws Exception {
File actualResultFile = new File(actualPath, testCaseCtx.getTestCase().getFilePath()
+ File.separatorChar + cUnit.getName() + '.' + ctx.getSeqNum() + ".polled.adm");
if (actualResultFile.exists() && !actualResultFile.delete()) {
@@ -1461,7 +1464,7 @@
private void poll(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit,
MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath,
- String newType, MutableInt actualWarnCount, IPollTask pollTask) throws Exception {
+ String newType, BitSet expectedWarnings, IPollTask pollTask) throws Exception {
// polltimeoutsecs=nnn, polldelaysecs=nnn
int timeoutSecs = getTimeoutSecs(statement);
int retryDelaySecs = getRetryDelaySecs(statement);
@@ -1484,7 +1487,7 @@
try {
startSemaphore.release();
pollTask.execute(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit,
- queryCount, expectedResultFileCtxs, testFile, actualPath, actualWarnCount);
+ queryCount, expectedResultFileCtxs, testFile, actualPath, expectedWarnings);
} finally {
endSemaphore.release();
}
@@ -1798,7 +1801,8 @@
for (CompilationUnit cUnit : cUnits) {
List<String> expectedErrors = cUnit.getExpectedError();
int expectedWarnCount = cUnit.getExpectedWarn().size();
- MutableInt actualWarnCount = new MutableInt(0);
+ BitSet expectedWarnings = new BitSet(cUnit.getExpectedWarn().size());
+ expectedWarnings.set(0, cUnit.getExpectedWarn().size());
LOGGER.info(
"Starting [TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " ... ");
Map<String, Object> variableCtx = new HashMap<>();
@@ -1818,7 +1822,7 @@
try {
if (!testFile.getName().startsWith(DIAGNOSE)) {
executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit,
- queryCount, expectedResultFileCtxs, testFile, actualPath, actualWarnCount);
+ queryCount, expectedResultFileCtxs, testFile, actualPath, expectedWarnings);
}
} catch (TestLoop loop) {
// rewind the iterator until we find our target
@@ -1850,7 +1854,7 @@
throw new Exception(
"Test \"" + cUnit.getName() + "\" FAILED; expected exception was not thrown...");
}
- ensureWarnings(actualWarnCount.getValue(), expectedWarnCount, cUnit);
+ ensureWarnings(expectedWarnings, cUnit);
LOGGER.info(
"[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " PASSED ");
if (passedGroup != null) {
@@ -1873,7 +1877,7 @@
final File file = ctx.getFile();
final String statement = readTestFile(file);
executeTestFile(testCaseCtx, ctx, variableCtx, statement, false, pb, cUnit, new MutableInt(-1),
- Collections.emptyList(), file, null, new MutableInt(-1));
+ Collections.emptyList(), file, null, new BitSet());
}
}
} catch (Exception diagnosticFailure) {
@@ -2074,9 +2078,17 @@
LOGGER.info("Cluster state now " + desiredState);
}
- protected void ensureWarnings(int actualWarnCount, int expectedWarnCount, CompilationUnit cUnit) throws Exception {
- if (actualWarnCount < expectedWarnCount) {
- LOGGER.error("Test {} failed to raise (an) expected warning(s)", cUnit.getName());
+ protected void ensureWarnings(BitSet expectedWarnings, CompilationUnit cUnit) throws Exception {
+ boolean fail = !expectedWarnings.isEmpty();
+ if (fail) {
+ LOGGER.error("Test {} failed to raise (an) expected warning(s):", cUnit.getName());
+ }
+ List<String> expectedWarn = cUnit.getExpectedWarn();
+ for (int i = expectedWarnings.nextSetBit(0); i >= 0; i = expectedWarnings.nextSetBit(i + 1)) {
+ String warning = expectedWarn.get(i);
+ LOGGER.error(warning);
+ }
+ if (fail) {
throw new Exception("Test \"" + cUnit.getName() + "\" FAILED; expected warning(s) was not returned...");
}
}
@@ -2212,22 +2224,30 @@
return extension.endsWith(AQL) ? getEndpoint(Servlets.QUERY_AQL) : getEndpoint(Servlets.QUERY_SERVICE);
}
- private void validateWarnings(List<String> actualWarnings, List<String> expectedWarn, MutableInt actualWarnCount,
+ private void validateWarnings(List<String> actualWarnings, List<String> expectedWarn, BitSet expectedWarnings,
boolean expectedSourceLoc) throws Exception {
if (actualWarnings != null) {
for (String actualWarn : actualWarnings) {
- if (expectedWarn.stream().noneMatch(actualWarn::contains)) {
- throw new Exception("unexpected warning was encountered (" + actualWarn + ")");
+ OptionalInt first = IntStream.range(0, expectedWarn.size())
+ .filter(i -> actualWarn.contains(expectedWarn.get(i)) && expectedWarnings.get(i)).findFirst();
+ if (!first.isPresent()) {
+ String msg = "unexpected warning was encountered or has already been matched (" + actualWarn + ")";
+ LOGGER.error(msg);
+ if (!expectedWarnings.isEmpty()) {
+ LOGGER.error("was expecting the following warnings: ");
+ }
+ for (int i = expectedWarnings.nextSetBit(0); i >= 0; i = expectedWarnings.nextSetBit(i + 1)) {
+ LOGGER.error(expectedWarn.get(i));
+ }
+ throw new Exception(msg);
}
if (expectedSourceLoc && !containsSourceLocation(actualWarn)) {
throw new Exception(MessageFormat.format(
"Expected to find source location \"{}, {}\" in warning text: +++++{}+++++",
ERR_MSG_SRC_LOC_LINE_REGEX, ERR_MSG_SRC_LOC_COLUMN_REGEX, actualWarn));
}
- actualWarnCount.increment();
- if (actualWarnCount.getValue() > expectedWarn.size()) {
- throw new Exception("returned warnings exceeded expected warnings");
- }
+ int warningIndex = first.getAsInt();
+ expectedWarnings.clear(warningIndex);
}
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
index 37a3916..55c78e3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
@@ -28,6 +28,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -385,11 +386,11 @@
public void executeTestFile(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, TestCase.CompilationUnit cUnit,
MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath,
- MutableInt actualWarnCount) throws Exception {
+ BitSet expectedWarnings) throws Exception {
String[] lines;
switch (ctx.getType()) {
case "s3bucket":
- // <bucket_name> <def_name> <sub-path:src_file1,sub-path:src_file2,sub-path:src_file3>
+ // <bucket> <def> <sub-path:new_fname:src_file1,sub-path:new_fname:src_file2,sub-path:src_file3>
lines = TestExecutor.stripAllComments(statement).trim().split("\n");
String lastLine = lines[lines.length - 1];
String[] command = lastLine.trim().split(" ");
@@ -401,7 +402,7 @@
break;
default:
super.executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit,
- queryCount, expectedResultFileCtxs, testFile, actualPath, actualWarnCount);
+ queryCount, expectedResultFileCtxs, testFile, actualPath, expectedWarnings);
}
}
}
@@ -425,23 +426,37 @@
int size = s3pathAndSourceFile.length;
String path;
String sourceFilePath;
- String sourceFileName;
+ String uploadedFileName;
if (size == 1) {
// case: playground json-data/reviews SOURCE_FILE1,SOURCE_FILE2
path = definitionPath;
sourceFilePath = s3pathAndSourceFile[0];
- sourceFileName = FilenameUtils.getName(s3pathAndSourceFile[0]);
- } else {
+ uploadedFileName = FilenameUtils.getName(s3pathAndSourceFile[0]);
+ } else if (size == 2) {
// case: playground json-data/reviews level1/sub-level:SOURCE_FILE1,level2/sub-level:SOURCE_FILE2
+ String subPathOrNewFileName = s3pathAndSourceFile[0];
+ if (subPathOrNewFileName.startsWith("$$")) {
+ path = definitionPath;
+ sourceFilePath = s3pathAndSourceFile[1];
+ uploadedFileName = subPathOrNewFileName.substring(2);
+ } else {
+ path = definitionPath + subPathOrNewFileName + (subPathOrNewFileName.endsWith("/") ? "" : "/");
+ sourceFilePath = s3pathAndSourceFile[1];
+ uploadedFileName = FilenameUtils.getName(s3pathAndSourceFile[1]);
+ }
+ } else if (size == 3) {
path = definitionPath + s3pathAndSourceFile[0] + (s3pathAndSourceFile[0].endsWith("/") ? "" : "/");
- sourceFilePath = s3pathAndSourceFile[1];
- sourceFileName = FilenameUtils.getName(s3pathAndSourceFile[1]);
+ uploadedFileName = s3pathAndSourceFile[1];
+ sourceFilePath = s3pathAndSourceFile[2];
+
+ } else {
+ throw new IllegalArgumentException();
}
- String keyPath = path + sourceFileName;
+ String keyPath = path + uploadedFileName;
int k = 1;
while (fileNames.contains(keyPath)) {
- keyPath = path + (k++) + sourceFileName;
+ keyPath = path + (k++) + uploadedFileName;
}
fileNames.add(keyPath);
client.putObject(PUT_OBJECT_BUILDER.bucket(bucketName).key(keyPath).build(),
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp
index f7c5c61..6df570c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.002.ddl.sqlpp
@@ -23,8 +23,10 @@
DROP TYPE t1 IF EXISTS;
DROP TYPE t2 IF EXISTS;
+DROP TYPE t3 IF EXISTS;
CREATE TYPE t1 AS {f1: int, f2: int, f3: int, f4: string};
CREATE TYPE t2 AS {f1: bigint, f2: bigint?, f3: double, f4: double?, f5: string, f6: string?, f7: boolean, f8: boolean?};
+CREATE TYPE t3 AS {f1: bigint, f2: string, f3: string};
DROP DATASET ds1 IF EXISTS;
CREATE EXTERNAL DATASET ds1(t1) USING S3 (
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.014.s3bucket.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.014.s3bucket.sqlpp
new file mode 100644
index 0000000..5d3989b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.014.s3bucket.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+playground data_dir data/csv/error1_line_num.csv
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.015.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.015.ddl.sqlpp
new file mode 100644
index 0000000..75ba5d6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.015.ddl.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+DROP DATASET ds2 IF EXISTS;
+CREATE EXTERNAL DATASET ds2(t3) USING S3 (
+("accessKeyId"="dummyAccessKey"),
+("secretAccessKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="data_dir"),
+("format"="CSV"),
+("header"="false")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.016.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.016.query.sqlpp
new file mode 100644
index 0000000..e6b24f3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.016.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
+
+FROM ds2 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.017.s3bucket.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.017.s3bucket.sqlpp
new file mode 100644
index 0000000..c35d646
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.017.s3bucket.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+playground data_dir data/csv/error2_line_num.csv
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.018.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.018.query.sqlpp
new file mode 100644
index 0000000..e6b24f3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/csv-warnings/query-dataset.018.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
+
+FROM ds2 v SELECT VALUE v ORDER BY v.f1 ASC;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.001.s3bucket.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.001.s3bucket.sqlpp
new file mode 100644
index 0000000..2bd413b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.001.s3bucket.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+playground data_dir $$1.json:data/json/malformed-json-no-closing.json,$$2.json:data/json/double-150-11.json
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.002.ddl.sqlpp
new file mode 100644
index 0000000..7112bb9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.002.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+DROP TYPE t1 IF EXISTS;
+CREATE TYPE t1 AS {};
+
+DROP DATASET ds1 IF EXISTS;
+CREATE EXTERNAL DATASET ds1(t1) USING S3 (
+("accessKeyId"="dummyAccessKey"),
+("secretAccessKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="data_dir"),
+("format"="JSON")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.003.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.003.query.sqlpp
new file mode 100644
index 0000000..5f2ad26
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.003.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// requesttype=application/json
+// param max-warnings:json=100
+USE test;
+
+FROM ds1 v SELECT VALUE v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.099.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.099.ddl.sqlpp
new file mode 100644
index 0000000..36b2bab
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/json-warnings/json-warnings.099.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/objects/ObjectsQueries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/objects/ObjectsQueries.xml
index 87bd204..e8902b6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/objects/ObjectsQueries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/objects/ObjectsQueries.xml
@@ -223,7 +223,7 @@
<expected-warn>Duplicate field name "name" (in line 22, at column 30)</expected-warn>
<expected-warn>Duplicate field name "id" (in line 22, at column 56)</expected-warn>
<expected-warn>Duplicate field name "f1" (in line 22, at column 70)</expected-warn>
- <expected-warn>Duplicate field name "id" (in line 22, at column 36)</expected-warn>
+ <expected-warn>Duplicate field name "id" (in line 22, at column 56)</expected-warn>
<expected-warn>Duplicate field name "f1" (in line 22, at column 83)</expected-warn>
<expected-warn>Duplicate field name "fname1" (in line 25, at column 45)</expected-warn>
</compilation-unit>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.006.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.006.adm
new file mode 100644
index 0000000..c3ce0a1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.006.adm
@@ -0,0 +1 @@
+{ "f1": 1, "f2": "good", "f3": "recommend" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.007.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.007.adm
new file mode 100644
index 0000000..c56f0c5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/csv-warnings/external_dataset.007.adm
@@ -0,0 +1,2 @@
+{ "f1": 1, "f2": "good", "f3": "recommend" }
+{ "f1": 3, "f2": "good", "f3": "recommend" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/json-warnings/json-warnings.003.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/json-warnings/json-warnings.003.adm
new file mode 100644
index 0000000..5bae5d1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/json-warnings/json-warnings.003.adm
@@ -0,0 +1 @@
+{ "double_value": 150.11 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_one_partition.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_one_partition.xml
index 0597d8f..6704d78 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_one_partition.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_one_partition.xml
@@ -42,35 +42,44 @@
<test-case FilePath="external-dataset" check-warnings="true">
<compilation-unit name="aws/s3/csv-warnings">
<output-dir compare="Text">aws/s3/csv-warnings</output-dir>
- <expected-warn>Parsing error at data_dir/no_h_missing_fields.csv record 2 field 3: some fields are missing</expected-warn>
- <expected-warn>Parsing error at data_dir/no_h_no_closing_q.csv record 0 field 0: malformed input record ended inside quote</expected-warn>
- <expected-warn>Parsing error at record 0 field 0: malformed input record ended inside quote</expected-warn>
+ <expected-warn>Parsing error at data_dir/no_h_missing_fields.csv line 2 field 3: some fields are missing</expected-warn>
+ <expected-warn>Parsing error at data_dir/no_h_no_closing_q.csv line 2 field 0: malformed input record ended abruptly</expected-warn>
+ <expected-warn>Parsing error at line 2 field 0: malformed input record ended abruptly</expected-warn>
- <expected-warn>Parsing error at record 4 field 3: invalid value</expected-warn>
- <expected-warn>Parsing error at record 1 field 1: invalid value</expected-warn>
- <expected-warn>Parsing error at record 10 field 1: invalid value</expected-warn>
- <expected-warn>Parsing error at record 2 field 1: invalid value</expected-warn>
- <expected-warn>Parsing error at record 3 field 1: invalid value</expected-warn>
- <expected-warn>Parsing error at record 6 field 7: invalid value</expected-warn>
- <expected-warn>Parsing error at record 12 field 7: invalid value</expected-warn>
- <expected-warn>Parsing error at record 11 field 3: invalid value</expected-warn>
- <expected-warn>Parsing error at record 8 field 6: a quote should be in the beginning</expected-warn>
+ <expected-warn>Parsing error at line 5 field 3: invalid value</expected-warn>
+ <expected-warn>Parsing error at line 2 field 1: invalid value</expected-warn>
+ <expected-warn>Parsing error at line 11 field 1: invalid value</expected-warn>
+ <expected-warn>Parsing error at line 3 field 1: invalid value</expected-warn>
+ <expected-warn>Parsing error at line 4 field 1: invalid value</expected-warn>
+ <expected-warn>Parsing error at line 7 field 7: invalid value</expected-warn>
+ <expected-warn>Parsing error at line 13 field 7: invalid value</expected-warn>
+ <expected-warn>Parsing error at line 12 field 3: invalid value</expected-warn>
+ <expected-warn>Parsing error at line 9 field 6: a quote should be in the beginning</expected-warn>
- <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 4 field 3: invalid value</expected-warn>
- <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 1 field 1: invalid value</expected-warn>
- <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 10 field 1: invalid value</expected-warn>
- <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 2 field 1: invalid value</expected-warn>
- <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 3 field 1: invalid value</expected-warn>
- <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 6 field 7: invalid value</expected-warn>
- <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 12 field 7: invalid value</expected-warn>
- <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 11 field 3: invalid value</expected-warn>
- <expected-warn>Parsing error at data_dir/h_invalid_values.csv record 8 field 6: a quote should be in the beginning</expected-warn>
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 5 field 3: invalid value</expected-warn>
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 2 field 1: invalid value</expected-warn>
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 11 field 1: invalid value</expected-warn>
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 3 field 1: invalid value</expected-warn>
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 4 field 1: invalid value</expected-warn>
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 7 field 7: invalid value</expected-warn>
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 13 field 7: invalid value</expected-warn>
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 12 field 3: invalid value</expected-warn>
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 9 field 6: a quote should be in the beginning</expected-warn>
+
+ <expected-warn>Parsing error at data_dir/error1_line_num.csv line 3 field 2: a quote enclosing a field needs to be followed by the delimiter</expected-warn>
+ <expected-warn>Parsing error at data_dir/error2_line_num.csv line 4 field 2: a quote enclosing a field needs to be followed by the delimiter</expected-warn>
</compilation-unit>
</test-case>
<test-case FilePath="external-dataset" check-warnings="true">
<compilation-unit name="aws/s3/tsv-warnings">
<output-dir compare="Text">aws/s3/tsv-warnings</output-dir>
- <expected-warn>Parsing error at data_dir/no_h_missing_fields.tsv record 2 field 3: some fields are missing</expected-warn>
+ <expected-warn>Parsing error at data_dir/no_h_missing_fields.tsv line 2 field 3: some fields are missing</expected-warn>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset" check-warnings="true">
+ <compilation-unit name="aws/s3/json-warnings">
+ <output-dir compare="Text">aws/s3/json-warnings</output-dir>
+ <expected-warn>Parsing error at data_dir/1.json line 3 field 0: malformed input record ended abruptly</expected-warn>
</compilation-unit>
</test-case>
<test-case FilePath="external-dataset" check-warnings="true">
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
index 4dfcbb5..f959f8d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
@@ -51,4 +51,8 @@
public String getStreamName() {
return "";
}
+
+ public String getPreviousStreamName() {
+ return "";
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
index 9d9ff28..f544ca0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
@@ -19,9 +19,18 @@
package org.apache.asterix.external.api;
import java.io.IOException;
+import java.util.function.LongSupplier;
@FunctionalInterface
public interface IRecordConverter<I, O> {
public O convert(IRawRecord<? extends I> input) throws IOException;
+
+ /**
+ * Configures the converter with information suppliers from the {@link IRecordReader} data source.
+ *
+ * @param lineNumber line number supplier
+ */
+ default void configure(LongSupplier lineNumber) {
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
index 9c9ec1c..c4dfdd0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
@@ -19,6 +19,7 @@
package org.apache.asterix.external.api;
import java.io.DataOutput;
+import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -40,11 +41,11 @@
public boolean parse(IRawRecord<? extends T> record, DataOutput out) throws HyracksDataException;
/**
- * Sets the data source name supplier that this parser is receiving records from. The data source name could be
- * used for reporting, for example.
+ * Configures the parser with information suppliers from the {@link IRecordReader} data source.
*
* @param dataSourceName data source name supplier
+ * @param lineNumber line number supplier
*/
- default void setDataSourceName(Supplier<String> dataSourceName) {
+ default void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) {
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index 95e83f2..cb97526 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -20,6 +20,7 @@
import java.io.Closeable;
import java.io.IOException;
+import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
@@ -90,4 +91,8 @@
default Supplier<String> getDataSourceName() {
return ExternalDataConstants.EMPTY_STRING;
}
+
+ default LongSupplier getLineNumber() {
+ return ExternalDataConstants.NO_LINES;
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
index 78240a0..8b930aa 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/CSVToRecordWithMetadataAndPKConverter.java
@@ -19,6 +19,7 @@
package org.apache.asterix.external.input.record.converter;
import java.io.IOException;
+import java.util.function.LongSupplier;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -38,6 +39,7 @@
private final int valueIndex;
private final RecordWithMetadataAndPK<char[]> recordWithMetadata;
private final CharArrayRecord record;
+ private LongSupplier lineNumber = ExternalDataConstants.NO_LINES;
public CSVToRecordWithMetadataAndPKConverter(final int valueIndex, final char delimiter, final ARecordType metaType,
final ARecordType recordType, final int[] keyIndicator, final int[] keyIndexes, final IAType[] keyTypes,
@@ -54,7 +56,7 @@
public RecordWithMetadataAndPK<char[]> convert(final IRawRecord<? extends char[]> input) throws IOException {
record.reset();
recordWithMetadata.reset();
- cursor.nextRecord(input.get(), input.size());
+ cursor.nextRecord(input.get(), input.size(), lineNumber.getAsLong());
int i = 0;
int j = 0;
FieldCursorForDelimitedDataParser.Result lastResult;
@@ -77,4 +79,9 @@
}
return recordWithMetadata;
}
+
+ @Override
+ public void configure(LongSupplier lineNumber) {
+ this.lineNumber = lineNumber == null ? ExternalDataConstants.NO_LINES : lineNumber;
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index a70848c..448d3f5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -88,12 +88,11 @@
in = new GZIPInputStream(s3Client.getObject(getObjectRequest), ExternalDataConstants.DEFAULT_BUFFER_SIZE);
}
+ // Current file ready, point to the next file
+ nextFileIndex++;
if (notificationHandler != null) {
notificationHandler.notifyNewSource();
}
-
- // Current file ready, point to the next file
- nextFileIndex++;
return true;
}
@@ -116,8 +115,16 @@
@Override
public String getStreamName() {
- int currentFileIndex = nextFileIndex - 1;
- return currentFileIndex < 0 || filePaths == null || filePaths.isEmpty() ? "" : filePaths.get(currentFileIndex);
+ return getStreamNameAt(nextFileIndex - 1);
+ }
+
+ @Override
+ public String getPreviousStreamName() {
+ return getStreamNameAt(nextFileIndex - 2);
+ }
+
+ private String getStreamNameAt(int fileIndex) {
+ return fileIndex < 0 || filePaths == null || filePaths.isEmpty() ? "" : filePaths.get(fileIndex);
}
/**
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index a3f560d..4b86142 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.function.LongSupplier;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -35,7 +36,8 @@
protected boolean hasHeader;
protected boolean prevCharCR;
protected int newlineLength;
- protected int recordNumber = 0;
+ protected long beginLineNumber = 1;
+ protected long lineNumber = 1;
protected boolean newSource = false;
private static final List<String> recordReaderFormats =
Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_DELIMITED_TEXT,
@@ -60,7 +62,8 @@
public void resetForNewSource() {
super.resetForNewSource();
newSource = true;
- recordNumber = 0;
+ beginLineNumber = 1;
+ lineNumber = 1;
prevCharCR = false;
newlineLength = 0;
}
@@ -98,6 +101,7 @@
* consuming it until we have a chance to look at the char that
* follows.
*/
+ beginLineNumber = lineNumber;
newlineLength = 0; //length of terminating newline
prevCharCR = false; //true of prev char was CR
record.reset();
@@ -120,9 +124,11 @@
if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
newlineLength = (prevCharCR) ? 2 : 1;
++bufferPosn; // at next invocation proceed from following byte
+ ++lineNumber;
break;
}
if (prevCharCR) { //CR + notLF, we are at notLF
+ ++lineNumber;
newlineLength = 1;
break;
}
@@ -140,8 +146,16 @@
newSource = false;
continue;
}
- recordNumber++;
return true;
}
}
+
+ @Override
+ public LongSupplier getLineNumber() {
+ return this::getBeginLineNumber;
+ }
+
+ private long getBeginLineNumber() {
+ return beginLineNumber;
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index 81b8e41..3a502d0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.external.input.record.reader.stream;
-import static org.apache.asterix.external.util.ExternalDataConstants.REC_ENDED_IN_Q;
+import static org.apache.asterix.external.util.ExternalDataConstants.REC_ENDED_AT_EOF;
import java.io.IOException;
import java.util.Arrays;
@@ -60,7 +60,7 @@
@Override
public void notifyNewSource() {
if (!record.isEmptyRecord() && warnings.shouldWarn()) {
- ParseUtil.warn(warnings, getDataSourceName().get(), recordNumber, 0, REC_ENDED_IN_Q);
+ ParseUtil.warn(warnings, getPreviousStreamName(), lineNumber, 0, REC_ENDED_AT_EOF);
}
// restart for a new record from a new source
resetForNewSource();
@@ -90,6 +90,7 @@
if (done) {
return false;
}
+ beginLineNumber = lineNumber;
newlineLength = 0;
prevCharCR = false;
prevCharEscape = false;
@@ -106,7 +107,7 @@
if (readLength <= 0 || inQuote) {
// haven't read anything previously OR have read and in the middle and hit the end
if (inQuote && warnings.shouldWarn()) {
- ParseUtil.warn(warnings, getDataSourceName().get(), recordNumber, 0, REC_ENDED_IN_Q);
+ ParseUtil.warn(warnings, getDataSourceName().get(), lineNumber, 0, REC_ENDED_AT_EOF);
}
close();
return false;
@@ -117,13 +118,18 @@
}
boolean maybeInQuote = false;
for (; bufferPosn < bufferLength; ++bufferPosn) {
- if (inputBuffer[bufferPosn] == quote && escape == quote) {
+ char ch = inputBuffer[bufferPosn];
+ // count lines here since we need to also count the lines inside quotes
+ if (ch == ExternalDataConstants.LF || prevCharCR) {
+ lineNumber++;
+ }
+ if (ch == quote && escape == quote) {
inQuote |= maybeInQuote;
prevCharEscape |= maybeInQuote;
}
maybeInQuote = false;
if (!inQuote) {
- if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
+ if (ch == ExternalDataConstants.LF) {
newlineLength = (prevCharCR) ? 2 : 1;
++bufferPosn;
break;
@@ -132,20 +138,20 @@
newlineLength = 1;
break;
}
- prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
// if this is an opening quote, mark it
- inQuote = inputBuffer[bufferPosn] == quote && !prevCharEscape;
+ inQuote = ch == quote && !prevCharEscape;
// the escape != quote is for making an opening quote not an escape
- prevCharEscape = inputBuffer[bufferPosn] == escape && !prevCharEscape && escape != quote;
+ prevCharEscape = ch == escape && !prevCharEscape && escape != quote;
} else {
// if quote == escape and current char is quote, then it could be closing or escaping
- if (inputBuffer[bufferPosn] == quote && !prevCharEscape) {
+ if (ch == quote && !prevCharEscape) {
// this is most likely a closing quote. the outcome depends on the next char
inQuote = false;
maybeInQuote = true;
}
- prevCharEscape = inputBuffer[bufferPosn] == escape && !prevCharEscape && escape != quote;
+ prevCharEscape = ch == escape && !prevCharEscape && escape != quote;
}
+ prevCharCR = (ch == ExternalDataConstants.CR);
}
readLength = bufferPosn - startPosn;
if (readLength > 0) {
@@ -159,7 +165,6 @@
newSource = false;
continue;
}
- recordNumber++;
return true;
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 1fb5b25..dfc60bc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -24,6 +24,7 @@
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
import static org.apache.asterix.external.util.ExternalDataConstants.LF;
import static org.apache.asterix.external.util.ExternalDataConstants.OPEN_BRACKET;
+import static org.apache.asterix.external.util.ExternalDataConstants.REC_ENDED_AT_EOF;
import static org.apache.asterix.external.util.ExternalDataConstants.SPACE;
import static org.apache.asterix.external.util.ExternalDataConstants.TAB;
@@ -32,14 +33,17 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.function.LongSupplier;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.ParseUtil;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
public class SemiStructuredRecordReader extends StreamRecordReader {
@@ -50,6 +54,7 @@
AFTER_COMMA // valid chars at this state: '{' to start a new nested record
}
+ private IWarningCollector warnings;
private int depth;
private boolean prevCharEscape;
private boolean inString;
@@ -57,8 +62,10 @@
private char recordEnd;
private boolean hasStarted;
private boolean hasFinished;
- private int recordNumber = 0;
+ private boolean isLastCharCR;
private State state = State.TOP_LEVEL;
+ private long beginLineNumber = 1;
+ private long lineNumber = 1;
private static final List<String> recordReaderFormats = Collections.unmodifiableList(
Arrays.asList(ExternalDataConstants.FORMAT_ADM, ExternalDataConstants.FORMAT_JSON_LOWER_CASE,
@@ -70,6 +77,7 @@
throws HyracksDataException {
super.configure(stream, config);
stream.setNotificationHandler(this);
+ warnings = ctx.getWarningCollector();
// set record opening char
recordStart = ExternalDataUtils.validateGetRecordStart(config);
// set record ending char
@@ -81,16 +89,22 @@
@Override
public void notifyNewSource() {
- if (hasStarted) {
- // TODO(ali): WARN
+ if (hasStarted && warnings.shouldWarn()) {
+ ParseUtil.warn(warnings, getPreviousStreamName(), lineNumber, 0, REC_ENDED_AT_EOF);
}
- recordNumber = 0;
+ beginLineNumber = 1;
+ lineNumber = 1;
state = State.TOP_LEVEL;
resetForNewRecord();
}
- public int getRecordNumber() {
- return recordNumber;
+ @Override
+ public LongSupplier getLineNumber() {
+ return this::getBeginLineNumber;
+ }
+
+ private long getBeginLineNumber() {
+ return beginLineNumber;
}
@Override
@@ -99,12 +113,16 @@
return false;
}
resetForNewRecord();
+ beginLineNumber = lineNumber;
do {
int startPosn = bufferPosn; // starting from where we left off the last time
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
bufferLength = reader.read(inputBuffer);
if (bufferLength < 0) {
+ if (hasStarted && warnings.shouldWarn()) {
+ ParseUtil.warn(warnings, getDataSourceName().get(), lineNumber, 0, REC_ENDED_AT_EOF);
+ }
close();
return false; // EOF
}
@@ -112,6 +130,10 @@
if (!hasStarted) {
for (; bufferPosn < bufferLength; ++bufferPosn) { // search for record begin
char c = inputBuffer[bufferPosn];
+ if (c == LF || isLastCharCR) {
+ lineNumber++;
+ }
+ isLastCharCR = c == CR;
if (c == SPACE || c == TAB || c == LF || c == CR) {
continue;
}
@@ -144,18 +166,22 @@
}
if (hasStarted) {
for (; bufferPosn < bufferLength; ++bufferPosn) {
+ char c = inputBuffer[bufferPosn];
+ if (c == LF || isLastCharCR) {
+ lineNumber++;
+ }
if (inString) {
// we are in a string, we only care about the string end
- if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE && !prevCharEscape) {
+ if (c == ExternalDataConstants.QUOTE && !prevCharEscape) {
inString = false;
}
- prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE && !prevCharEscape;
+ prevCharEscape = c == ExternalDataConstants.ESCAPE && !prevCharEscape;
} else {
- if (inputBuffer[bufferPosn] == ExternalDataConstants.QUOTE) {
+ if (c == ExternalDataConstants.QUOTE) {
inString = true;
- } else if (inputBuffer[bufferPosn] == recordStart) {
+ } else if (c == recordStart) {
depth += 1;
- } else if (inputBuffer[bufferPosn] == recordEnd) {
+ } else if (c == recordEnd) {
depth -= 1;
if (depth == 0) {
hasFinished = true;
@@ -164,6 +190,7 @@
}
}
}
+ isLastCharCR = c == CR;
}
}
@@ -179,7 +206,6 @@
}
} while (!hasFinished);
record.endRecord();
- recordNumber++;
return true;
}
@@ -198,6 +224,7 @@
hasStarted = false;
hasFinished = false;
prevCharEscape = false;
+ isLastCharCR = false;
inString = false;
depth = 0;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
index 6139f82..cb16de5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
@@ -48,6 +48,7 @@
protected boolean done = false;
protected FeedLogManager feedLogManager;
private Supplier<String> dataSourceName = EMPTY_STRING;
+ private Supplier<String> previousDataSourceName = EMPTY_STRING;
public void configure(AsterixInputStream inputStream, Map<String, String> config) {
this.reader = new AsterixInputStreamReader(inputStream);
@@ -55,6 +56,7 @@
inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
if (!ExternalDataUtils.isTrue(config, KEY_REDACT_WARNINGS)) {
this.dataSourceName = reader::getStreamName;
+ this.previousDataSourceName = reader::getPreviousStreamName;
}
}
@@ -118,6 +120,10 @@
return dataSourceName;
}
+ String getPreviousStreamName() {
+ return previousDataSourceName.get();
+ }
+
public abstract List<String> getRecordReaderFormats();
public abstract String getRequiredConfigs();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
index f5f68fe..4e963e4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
@@ -131,4 +131,8 @@
public String getStreamName() {
return in.getStreamName();
}
+
+ public String getPreviousStreamName() {
+ return in.getPreviousStreamName();
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
index 4fec3c4..9e1b052 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@ -37,6 +37,7 @@
private static final Logger LOGGER = LogManager.getLogger();
private final FileSystemWatcher watcher;
private File currentFile;
+ private String lastFileName = "";
public LocalFSInputStream(FileSystemWatcher watcher) {
this.watcher = watcher;
@@ -90,6 +91,10 @@
@Override
protected boolean advance() throws IOException {
+ String tmpLastFileName = "";
+ if (currentFile != null) {
+ tmpLastFileName = currentFile.getPath();
+ }
closeFile();
currentFile = watcher.poll();
if (currentFile == null) {
@@ -100,6 +105,7 @@
}
if (currentFile != null) {
in = new FileInputStream(currentFile);
+ lastFileName = tmpLastFileName;
if (notificationHandler != null) {
notificationHandler.notifyNewSource();
}
@@ -156,4 +162,9 @@
public String getStreamName() {
return currentFile == null ? "" : currentFile.getPath();
}
+
+ @Override
+ public String getPreviousStreamName() {
+ return lastFileName;
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
index 8ac483e..60e6e77 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -26,6 +26,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.apache.asterix.builders.IARecordBuilder;
@@ -65,6 +66,7 @@
private final IValueParser[] valueParsers;
private FieldCursorForDelimitedDataParser cursor;
private Supplier<String> dataSourceName;
+ private LongSupplier lineNumber;
private final byte[] fieldTypeTags;
private final int[] fldIds;
private final ArrayBackedValueStorage[] nameBuffers;
@@ -74,6 +76,7 @@
char quote, boolean hasHeader, ARecordType recordType, boolean isStreamParser, String nullString)
throws HyracksDataException {
this.dataSourceName = ExternalDataConstants.EMPTY_STRING;
+ this.lineNumber = ExternalDataConstants.NO_LINES;
this.warnings = ctx.getWarningCollector();
this.fieldDelimiter = fieldDelimiter;
this.quote = quote;
@@ -114,7 +117,8 @@
}
}
if (!isStreamParser) {
- cursor = new FieldCursorForDelimitedDataParser(null, this.fieldDelimiter, quote, warnings, dataSourceName);
+ cursor = new FieldCursorForDelimitedDataParser(null, this.fieldDelimiter, quote, warnings,
+ this::getDataSourceName);
}
this.nullChars = nullString != null ? nullString.toCharArray() : null;
}
@@ -122,7 +126,7 @@
@Override
public boolean parse(DataOutput out) throws HyracksDataException {
try {
- while (cursor.nextRecord()) {
+ if (cursor.nextRecord()) {
if (parseRecord()) {
recBuilder.write(out, true);
return true;
@@ -149,7 +153,7 @@
break;
case END:
if (warnings.shouldWarn()) {
- ParseUtil.warn(warnings, dataSourceName.get(), cursor.getRecordCount(),
+ ParseUtil.warn(warnings, dataSourceName.get(), cursor.getLineCount(),
cursor.getFieldCount(), MISSING_FIELDS);
}
return false;
@@ -164,8 +168,10 @@
fieldValueBufferOutput.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
} else {
if (cursor.isFieldEmpty() && !canProcessEmptyField(recordType.getFieldTypes()[i])) {
- ParseUtil.warn(warnings, dataSourceName.get(), cursor.getRecordCount(), cursor.getFieldCount(),
- EMPTY_FIELD);
+ if (warnings.shouldWarn()) {
+ ParseUtil.warn(warnings, dataSourceName.get(), cursor.getLineCount(),
+ cursor.getFieldCount(), EMPTY_FIELD);
+ }
return false;
}
fieldValueBufferOutput.writeByte(fieldTypeTags[i]);
@@ -176,8 +182,10 @@
boolean success = valueParsers[i].parse(cursor.getBuffer(), cursor.getFieldStart(),
cursor.getFieldLength(), fieldValueBufferOutput);
if (!success) {
- ParseUtil.warn(warnings, dataSourceName.get(), cursor.getRecordCount(), cursor.getFieldCount(),
- INVALID_VAL);
+ if (warnings.shouldWarn()) {
+ ParseUtil.warn(warnings, dataSourceName.get(), cursor.getLineCount(),
+ cursor.getFieldCount(), INVALID_VAL);
+ }
return false;
}
}
@@ -190,12 +198,19 @@
throw HyracksDataException.create(e);
}
}
- return true;
+ try {
+ while (cursor.nextField() == FieldCursorForDelimitedDataParser.Result.OK) {
+ // keep reading and discarding the extra fields
+ }
+ return true;
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
}
@Override
public boolean parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
- cursor.nextRecord(record.get(), record.size());
+ cursor.nextRecord(record.get(), record.size(), lineNumber.getAsLong());
if (parseRecord()) {
recBuilder.write(out, true);
return true;
@@ -207,7 +222,7 @@
public void setInputStream(InputStream in) throws IOException {
// TODO(ali): revisit this in regards to stream
cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote, warnings,
- dataSourceName);
+ this::getDataSourceName);
if (hasHeader) {
cursor.nextRecord();
FieldCursorForDelimitedDataParser.Result result;
@@ -224,13 +239,19 @@
public boolean reset(InputStream in) throws IOException {
// TODO(ali): revisit this in regards to stream
cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote, warnings,
- dataSourceName);
+ this::getDataSourceName);
return true;
}
@Override
- public void setDataSourceName(Supplier<String> dataSourceName) {
+ public void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) {
this.dataSourceName = dataSourceName == null ? ExternalDataConstants.EMPTY_STRING : dataSourceName;
+ this.lineNumber = lineNumber == null ? ExternalDataConstants.NO_LINES : lineNumber;
+
+ }
+
+ private String getDataSourceName() {
+ return dataSourceName.get();
}
private static boolean canProcessEmptyField(IAType fieldType) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
index d799f22..820775c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
@@ -20,6 +20,8 @@
import java.io.DataOutput;
import java.io.IOException;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.external.api.IDataParser;
@@ -111,4 +113,10 @@
public void appendLastParsedPrimaryKeyToTuple(ArrayTupleBuilder tb) throws HyracksDataException {
rwm.appendPrimaryKeyToTuple(tb);
}
+
+ @Override
+ public void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) {
+ this.recordParser.configure(dataSourceName, lineNumber);
+ this.converter.configure(lineNumber);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 2644f3d..f60ecdc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -71,7 +71,7 @@
IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
// TODO(ali): revisit to think about passing data source name via setter or via createRecordParser
- dataParser.setDataSourceName(recordReader.getDataSourceName());
+ dataParser.configure(recordReader.getDataSourceName(), recordReader.getLineNumber());
if (indexingOp) {
return new IndexingDataFlowController(ctx, dataParser, recordReader,
((IIndexingDatasource) recordReader).getIndexer());
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index b0acf44..63f57b6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import java.util.function.LongSupplier;
import java.util.function.Supplier;
public class ExternalDataConstants {
@@ -253,6 +254,7 @@
public static final int MAX_RECORD_SIZE = 32000000;
public static final Supplier<String> EMPTY_STRING = () -> "";
+ public static final LongSupplier NO_LINES = () -> -1;
/**
* Expected parameter values
@@ -270,7 +272,7 @@
public static final String ERROR_PARSE_RECORD = "Parser failed to parse record";
public static final String MISSING_FIELDS = "some fields are missing";
- public static final String REC_ENDED_IN_Q = "malformed input record ended inside quote";
+ public static final String REC_ENDED_AT_EOF = "malformed input record ended abruptly";
public static final String EMPTY_FIELD = "empty value";
public static final String INVALID_VAL = "invalid value";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ParseUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ParseUtil.java
index 129f28a..598d9ff 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ParseUtil.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ParseUtil.java
@@ -30,9 +30,9 @@
private ParseUtil() {
}
- public static void warn(IWarningCollector warningCollector, String dataSourceName, int recordNum, int fieldNum,
+ public static void warn(IWarningCollector warningCollector, String dataSourceName, long lineNum, int fieldNum,
String warnMessage) {
warningCollector.warn(
- Warning.forHyracks(SRC_LOC, ErrorCode.PARSING_ERROR, dataSourceName, recordNum, fieldNum, warnMessage));
+ Warning.forHyracks(SRC_LOC, ErrorCode.PARSING_ERROR, dataSourceName, lineNum, fieldNum, warnMessage));
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 4e3cf4e..ec536c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -140,7 +140,7 @@
121 = A numeric type promotion error has occurred: %1$s
122 = Encountered an error while printing the plan
123 = Insufficient memory is provided for the join operators, please increase the join memory budget.
-124 = Parsing error at %1$s record %2$s field %3$s: %4$s
+124 = Parsing error at %1$s line %2$s field %3$s: %4$s
10000 = The given rule collection %1$s is not an instance of the List class.
10001 = Cannot compose partition constraint %1$s with %2$s
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
index ed2777b..48cfca6 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FieldCursorForDelimitedDataParser.java
@@ -54,7 +54,7 @@
private char[] buffer; //buffer to holds the input coming form the underlying input stream
private int fStart; //start position for field
private int fEnd; //end position for field
- private int recordCount; //count of records
+ private long lineCount; //count of lines
private int fieldCount; //count of fields in current record
private int doubleQuoteCount; //count of double quotes
private boolean isDoubleQuoteIncludedInThisField; //does current field include double quotes
@@ -99,7 +99,7 @@
doubleQuoteCount = 0;
startedQuote = false;
isDoubleQuoteIncludedInThisField = false;
- recordCount = 0;
+ lineCount = 1;
fieldCount = 0;
}
@@ -127,12 +127,12 @@
return fieldCount;
}
- public int getRecordCount() {
- return recordCount;
+ public long getLineCount() {
+ return lineCount;
}
- public void nextRecord(char[] buffer, int recordLength) {
- recordCount++;
+ public void nextRecord(char[] buffer, int recordLength, long lineNumber) {
+ lineCount = lineNumber;
fieldCount = 0;
lastDelimiterPosition = -1;
lastQuotePosition = -1;
@@ -148,7 +148,6 @@
}
public boolean nextRecord() throws IOException {
- recordCount++;
fieldCount = 0;
while (true) {
switch (state) {
@@ -164,6 +163,7 @@
case IN_RECORD:
int p = start;
+ char lastChar = '\0';
while (true) {
if (p >= end) {
int s = start;
@@ -204,6 +204,11 @@
lastDelimiterPosition = p;
break;
}
+ // count lines inside quotes
+ if (ch == '\r' || (ch == '\n' && lastChar != '\r')) {
+ lineCount++;
+ }
+ lastChar = ch;
++p;
}
break;
@@ -217,6 +222,10 @@
}
}
char ch = buffer[start];
+ // if the next char "ch" is not \n, then count the \r
+ if (ch != '\n') {
+ lineCount++;
+ }
if (ch == '\n' && !startedQuote) {
++start;
state = State.EOR;
@@ -226,6 +235,7 @@
}
case EOR:
+ lineCount++;
if (start >= end) {
eof = !readMore();
if (eof) {
@@ -265,6 +275,7 @@
quoteCount = 0;
doubleQuoteCount = 0;
+ char lastChar = '\0';
int p = start;
while (true) {
if (p >= end) {
@@ -380,6 +391,11 @@
return Result.OK;
}
}
+ // count lines inside quotes
+ if (ch == '\r' || (ch == '\n' && lastChar != '\r')) {
+ lineCount++;
+ }
+ lastChar = ch;
++p;
}
}
@@ -434,7 +450,7 @@
}
private void warn(String message) {
- warnings.warn(Warning.forHyracks(SRC_LOC, ErrorCode.PARSING_ERROR, dataSourceName.get(), recordCount,
- fieldCount, message));
+ warnings.warn(Warning.forHyracks(SRC_LOC, ErrorCode.PARSING_ERROR, dataSourceName.get(), lineCount, fieldCount,
+ message));
}
}