Fixed CSV parser to recognize quote and delimiter inside a string.
Change-Id: Iac102286ff90d2b4cc54b1183fa024dec006c3b3
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/134
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <westmann@gmail.com>
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IntegerParserFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IntegerParserFactory.java
index 922c078..28fe5e6 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IntegerParserFactory.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IntegerParserFactory.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -66,7 +66,10 @@
break;
default:
- throw new HyracksDataException("Encountered " + ch);
+ String errorString = new String(buffer, i + start, length - i);
+ throw new HyracksDataException(
+ "Integer Parser - a digit is expected. But, encountered this character: " + ch
+ + " in the incoming input: " + errorString);
}
}
boolean post = false;
@@ -85,6 +88,11 @@
case '9':
n = n * 10 + (ch - '0');
break;
+ default:
+ String errorString = new String(buffer, i + start, length - i);
+ throw new HyracksDataException(
+ "Integer Parser - a digit is expected. But, encountered this character: " + ch
+ + " in the incoming input: " + errorString);
}
}
@@ -99,7 +107,10 @@
break;
default:
- throw new HyracksDataException("Encountered " + ch);
+ String errorString = new String(buffer, i + start, length - i);
+ throw new HyracksDataException("Integer Parser - a whitespace, tab, new line, or "
+ + "form-feed expected. But, encountered this character: " + ch
+ + " in the incoming input: " + errorString);
}
}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
index 33e8e03..e34ac68 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -66,7 +66,10 @@
break;
default:
- throw new HyracksDataException("Encountered " + ch);
+ String errorString = new String(buffer, i + start, length - i);
+ throw new HyracksDataException(
+ "Long Parser - a digit is expected. But, encountered this character: " + ch
+ + " in the incoming input: " + errorString);
}
}
boolean post = false;
@@ -85,6 +88,11 @@
case '9':
n = n * 10 + (ch - '0');
break;
+ default:
+ String errorString = new String(buffer, i + start, length - i);
+ throw new HyracksDataException(
+ "Long Parser - a digit is expected. But, encountered this character: " + ch
+ + " in the incoming input: " + errorString);
}
}
@@ -99,10 +107,14 @@
break;
default:
- throw new HyracksDataException("Encountered " + ch);
+ String errorString = new String(buffer, i + start, length - i);
+ throw new HyracksDataException(
+ "Long Parser - a whitespace, tab, new line, or form-feed expected. "
+ + "But, encountered this character: " + ch + " in the incoming input: "
+ + errorString);
}
}
-
+
try {
out.writeLong(n * sign);
} catch (IOException e) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index ba8cfd6..dd1343d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -35,10 +35,24 @@
private static final long serialVersionUID = 1L;
private IValueParserFactory[] valueParserFactories;
private char fieldDelimiter;
+ private char quote;
+
+ private boolean isDoubleQuoteIncludedInThisField;
+ private int doubleQuoteCount;
+
+ private int lineCount;
+ private int fieldCount;
public DelimitedDataTupleParserFactory(IValueParserFactory[] fieldParserFactories, char fieldDelimiter) {
+ this(fieldParserFactories,fieldDelimiter, '\"');
+ }
+
+ public DelimitedDataTupleParserFactory(IValueParserFactory[] fieldParserFactories, char fieldDelimiter, char quote) {
this.valueParserFactories = fieldParserFactories;
this.fieldDelimiter = fieldDelimiter;
+ this.quote = quote;
+ this.fieldCount = 0;
+ this.lineCount = 1;
}
@Override
@@ -64,14 +78,22 @@
if (!cursor.nextField()) {
break;
}
+ // Eliminate doule quotes in the field that we are going to parse
+ if (isDoubleQuoteIncludedInThisField) {
+ eliminateDoulbleQuote(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
+ cursor.fEnd -= doubleQuoteCount;
+ isDoubleQuoteIncludedInThisField = false;
+ }
valueParsers[i].parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart, dos);
tb.addFieldEndOffset();
+ fieldCount++;
}
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
FrameUtils.flushFrame(frame, writer);
appender.reset(frame, true);
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
+ throw new HyracksDataException("Record size (" + tb.getSize()
+ + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
}
}
}
@@ -87,10 +109,14 @@
}
private enum State {
- INIT, IN_RECORD, EOR, CR, EOF
+ INIT,
+ IN_RECORD,
+ EOR,
+ CR,
+ EOF
}
- private class FieldCursor {
+ protected class FieldCursor {
private static final int INITIAL_BUFFER_SIZE = 4096;
private static final int INCREMENT = 4096;
@@ -104,12 +130,23 @@
private int fStart;
private int fEnd;
+ private int lastQuotePosition;
+ private int lastDoubleQuotePosition;
+ private int lastDelimiterPosition;
+ private int quoteCount;
+ private boolean startedQuote;
+
public FieldCursor(Reader in) {
this.in = in;
buffer = new char[INITIAL_BUFFER_SIZE];
start = 0;
end = 0;
state = State.INIT;
+ lastDelimiterPosition = -99;
+ lastQuotePosition = -99;
+ lastDoubleQuotePosition = -99;
+ quoteCount = 0;
+ startedQuote = false;
}
public boolean nextRecord() throws IOException {
@@ -136,13 +173,32 @@
return start < end;
}
p -= (s - start);
+ lastQuotePosition -= (s - start);
+ lastDoubleQuotePosition -= (s - start);
+ lastDelimiterPosition -= (s - start);
}
char ch = buffer[p];
- if (ch == '\n') {
+ // We perform rough format correctness (delimiter, quote) check here
+ // to set the starting position of a record.
+ // In the field level, more checking will be conducted.
+ if (ch == quote) {
+ startedQuote = true;
+ // check two quotes in a row - "". This is an escaped quote
+ if (lastQuotePosition == p - 1 && start != p - 1 && lastDoubleQuotePosition != p - 1) {
+ lastDoubleQuotePosition = p;
+ }
+ lastQuotePosition = p;
+ } else if (ch == fieldDelimiter) {
+ if (startedQuote && lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1) {
+ startedQuote = false;
+ lastDelimiterPosition = p;
+ }
+ } else if (ch == '\n' && !startedQuote) {
start = p + 1;
state = State.EOR;
+ lastDelimiterPosition = p;
break;
- } else if (ch == '\r') {
+ } else if (ch == '\r' && !startedQuote) {
start = p + 1;
state = State.CR;
break;
@@ -160,7 +216,7 @@
}
}
char ch = buffer[start];
- if (ch == '\n') {
+ if (ch == '\n' && !startedQuote) {
++start;
state = State.EOR;
} else {
@@ -177,6 +233,7 @@
}
}
state = State.IN_RECORD;
+ lastDelimiterPosition = start;
return start < end;
case EOF:
@@ -195,35 +252,135 @@
case IN_RECORD:
boolean eof;
+ // reset quote related values
+ startedQuote = false;
+ isDoubleQuoteIncludedInThisField = false;
+ lastQuotePosition = -99;
+ lastDoubleQuotePosition = -99;
+ quoteCount = 0;
+ doubleQuoteCount = 0;
+
int p = start;
while (true) {
if (p >= end) {
int s = start;
eof = !readMore();
+ p -= (s - start);
+ lastQuotePosition -= (s - start);
+ lastDoubleQuotePosition -= (s - start);
+ lastDelimiterPosition -= (s - start);
if (eof) {
state = State.EOF;
+ if (startedQuote && lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
+ && quoteCount == doubleQuoteCount * 2 + 2) {
+ // set the position of fStart to +1, fEnd to -1 to remove quote character
+ fStart = start + 1;
+ fEnd = p - 1;
+ } else {
+ fStart = start;
+ fEnd = p;
+ }
return true;
}
- p -= (s - start);
}
char ch = buffer[p];
- if (ch == fieldDelimiter) {
- fStart = start;
- fEnd = p;
- start = p + 1;
- return true;
+ if (ch == quote) {
+ // If this is first quote in the field, then it needs to be placed in the beginning.
+ if (!startedQuote) {
+ if (lastDelimiterPosition == p - 1 || lastDelimiterPosition == -99) {
+ startedQuote = true;
+ } else {
+ // In this case, we don't have a quote in the beginning of a field.
+ throw new IOException(
+ "At line: "
+ + lineCount
+ + ", field#: "
+ + (fieldCount+1)
+ + " - a quote enclosing a field needs to be placed in the beginning of that field.");
+ }
+ }
+ // Check double quotes - "". We check [start != p-2]
+ // to avoid false positive where there is no value in a field,
+ // since it looks like a double quote. However, it's not a double quote.
+ // (e.g. if field2 has no value:
+ // field1,"",field3 ... )
+ if (lastQuotePosition == p - 1 && lastDelimiterPosition != p - 2
+ && lastDoubleQuotePosition != p - 1) {
+ isDoubleQuoteIncludedInThisField = true;
+ doubleQuoteCount++;
+ lastDoubleQuotePosition = p;
+ }
+ lastQuotePosition = p;
+ quoteCount++;
+ } else if (ch == fieldDelimiter) {
+ // If there was no quote in the field,
+ // then we assume that the field contains a valid string.
+ if (!startedQuote) {
+ fStart = start;
+ fEnd = p;
+ start = p + 1;
+ lastDelimiterPosition = p;
+ return true;
+ } else if (startedQuote) {
+ if (lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1) {
+ // There is a quote right before the delimiter (e.g. ",) and it is not two quote,
+ // then the field contains a valid string.
+ // We set the position of fStart to +1, fEnd to -1 to remove quote character
+ fStart = start + 1;
+ fEnd = p - 1;
+ start = p + 1;
+ lastDelimiterPosition = p;
+ return true;
+ } else if (lastQuotePosition < p - 1 && lastQuotePosition != lastDoubleQuotePosition
+ && quoteCount == doubleQuoteCount * 2 + 2) {
+ // There is a quote before the delimiter, however it is not directly placed before the delimiter.
+ // In this case, we throw an exception.
+ // quoteCount == doubleQuoteCount * 2 + 2 : only true when we have two quotes except double-quotes.
+ throw new IOException("At line: " + lineCount + ", field#: " + (fieldCount+1)
+ + " - A quote enclosing a field needs to be followed by the delimiter.");
+ }
+ }
+ // If the control flow reaches here: we have a delimiter in this field and
+ // there should be a quote in the beginning and the end of
+ // this field. So, just continue reading next character
} else if (ch == '\n') {
- fStart = start;
- fEnd = p;
- start = p + 1;
- state = State.EOR;
- return true;
+ if (!startedQuote) {
+ fStart = start;
+ fEnd = p;
+ start = p + 1;
+ state = State.EOR;
+ lineCount++;
+ lastDelimiterPosition = p;
+ return true;
+ } else if (startedQuote && lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
+ && quoteCount == doubleQuoteCount * 2 + 2) {
+ // set the position of fStart to +1, fEnd to -1 to remove quote character
+ fStart = start + 1;
+ fEnd = p - 1;
+ lastDelimiterPosition = p;
+ start = p + 1;
+ state = State.EOR;
+ lineCount++;
+ return true;
+ }
} else if (ch == '\r') {
- fStart = start;
- fEnd = p;
- start = p + 1;
- state = State.CR;
- return true;
+ if (!startedQuote) {
+ fStart = start;
+ fEnd = p;
+ start = p + 1;
+ state = State.CR;
+ lastDelimiterPosition = p;
+ return true;
+ } else if (startedQuote && lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
+ && quoteCount == doubleQuoteCount * 2 + 2) {
+ // set the position of fStart to +1, fEnd to -1 to remove quote character
+ fStart = start + 1;
+ fEnd = p - 1;
+ lastDelimiterPosition = p;
+ start = p + 1;
+ state = State.CR;
+ return true;
+ }
}
++p;
}
@@ -231,7 +388,16 @@
throw new IllegalStateException();
}
- private boolean readMore() throws IOException {
+ protected void resetQuoteRelatedValue() {
+ startedQuote = false;
+ isDoubleQuoteIncludedInThisField = false;
+ lastQuotePosition = -99;
+ lastDoubleQuotePosition = -99;
+ quoteCount = 0;
+ doubleQuoteCount = 0;
+ }
+
+ protected boolean readMore() throws IOException {
if (start > 0) {
System.arraycopy(buffer, start, buffer, 0, end - start);
}
@@ -249,5 +415,29 @@
end += n;
return true;
}
+
+ }
+
+ // Eliminate escaped double quotes("") in a field
+ protected void eliminateDoulbleQuote(char[] buffer, int start, int length) {
+ int lastDoubleQuotePosition = -99;
+ int writepos = start;
+ int readpos = start;
+ // Find positions where double quotes appear
+ for (int i = 0; i < length; i++) {
+ // Skip double quotes
+ if (buffer[readpos] == quote && lastDoubleQuotePosition != readpos - 1) {
+ lastDoubleQuotePosition = readpos;
+ readpos++;
+ } else {
+ // Moving characters except double quote to the front
+ if (writepos != readpos) {
+ buffer[writepos] = buffer[readpos];
+ }
+ writepos++;
+ readpos++;
+ }
+ }
}
}
+