Fixed CSV parser to recognize quote and delimiter inside a string.

Change-Id: Iac102286ff90d2b4cc54b1183fa024dec006c3b3
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/134
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <westmann@gmail.com>
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IntegerParserFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IntegerParserFactory.java
index 922c078..28fe5e6 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IntegerParserFactory.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IntegerParserFactory.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -66,7 +66,10 @@
                             break;
 
                         default:
-                            throw new HyracksDataException("Encountered " + ch);
+                            String errorString = new String(buffer, i + start, length - i);
+                            throw new HyracksDataException(
+                                    "Integer Parser - a digit is expected. But, encountered this character: " + ch
+                                            + " in the incoming input: " + errorString);
                     }
                 }
                 boolean post = false;
@@ -85,6 +88,11 @@
                         case '9':
                             n = n * 10 + (ch - '0');
                             break;
+                        default:
+                            String errorString = new String(buffer, i + start, length - i);
+                            throw new HyracksDataException(
+                                    "Integer Parser - a digit is expected. But, encountered this character: " + ch
+                                            + " in the incoming input: " + errorString);
                     }
                 }
 
@@ -99,7 +107,10 @@
                             break;
 
                         default:
-                            throw new HyracksDataException("Encountered " + ch);
+                            String errorString = new String(buffer, i + start, length - i);
+                            throw new HyracksDataException("Integer Parser - a whitespace, tab, new line, or "
+                                    + "form-feed expected. But, encountered this character: " + ch
+                                    + " in the incoming input: " + errorString);
                     }
                 }
 
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
index 33e8e03..e34ac68 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -66,7 +66,10 @@
                             break;
 
                         default:
-                            throw new HyracksDataException("Encountered " + ch);
+                            String errorString = new String(buffer, i + start, length - i);
+                            throw new HyracksDataException(
+                                    "Long Parser - a digit is expected. But, encountered this character: " + ch
+                                            + " in the incoming input: " + errorString);
                     }
                 }
                 boolean post = false;
@@ -85,6 +88,11 @@
                         case '9':
                             n = n * 10 + (ch - '0');
                             break;
+                        default:
+                            String errorString = new String(buffer, i + start, length - i);
+                            throw new HyracksDataException(
+                                    "Long Parser - a digit is expected. But, encountered this character: " + ch
+                                            + " in the incoming input: " + errorString);
                     }
                 }
 
@@ -99,10 +107,14 @@
                             break;
 
                         default:
-                            throw new HyracksDataException("Encountered " + ch);
+                            String errorString = new String(buffer, i + start, length - i);
+                            throw new HyracksDataException(
+                                    "Long Parser - a whitespace, tab, new line, or form-feed expected. "
+                                            + "But, encountered this character: " + ch + " in the incoming input: "
+                                            + errorString);
                     }
                 }
-                
+
                 try {
                     out.writeLong(n * sign);
                 } catch (IOException e) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index ba8cfd6..dd1343d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -35,10 +35,24 @@
     private static final long serialVersionUID = 1L;
     private IValueParserFactory[] valueParserFactories;
     private char fieldDelimiter;
+    private char quote;
+
+    private boolean isDoubleQuoteIncludedInThisField;
+    private int doubleQuoteCount;
+
+    private int lineCount;
+    private int fieldCount;
 
     public DelimitedDataTupleParserFactory(IValueParserFactory[] fieldParserFactories, char fieldDelimiter) {
+        this(fieldParserFactories,fieldDelimiter, '\"');
+    }
+
+    public DelimitedDataTupleParserFactory(IValueParserFactory[] fieldParserFactories, char fieldDelimiter, char quote) {
         this.valueParserFactories = fieldParserFactories;
         this.fieldDelimiter = fieldDelimiter;
+        this.quote = quote;
+        this.fieldCount = 0;
+        this.lineCount = 1;
     }
 
     @Override
@@ -64,14 +78,22 @@
                             if (!cursor.nextField()) {
                                 break;
                             }
+                            // Eliminate doule quotes in the field that we are going to parse
+                            if (isDoubleQuoteIncludedInThisField) {
+                                eliminateDoulbleQuote(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
+                                cursor.fEnd -= doubleQuoteCount;
+                                isDoubleQuoteIncludedInThisField = false;
+                            }
                             valueParsers[i].parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart, dos);
                             tb.addFieldEndOffset();
+                            fieldCount++;
                         }
                         if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
                             FrameUtils.flushFrame(frame, writer);
                             appender.reset(frame, true);
                             if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                                throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
+                                throw new HyracksDataException("Record size (" + tb.getSize()
+                                        + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
                             }
                         }
                     }
@@ -87,10 +109,14 @@
     }
 
     private enum State {
-        INIT, IN_RECORD, EOR, CR, EOF
+        INIT,
+        IN_RECORD,
+        EOR,
+        CR,
+        EOF
     }
 
-    private class FieldCursor {
+    protected class FieldCursor {
         private static final int INITIAL_BUFFER_SIZE = 4096;
         private static final int INCREMENT = 4096;
 
@@ -104,12 +130,23 @@
         private int fStart;
         private int fEnd;
 
+        private int lastQuotePosition;
+        private int lastDoubleQuotePosition;
+        private int lastDelimiterPosition;
+        private int quoteCount;
+        private boolean startedQuote;
+
         public FieldCursor(Reader in) {
             this.in = in;
             buffer = new char[INITIAL_BUFFER_SIZE];
             start = 0;
             end = 0;
             state = State.INIT;
+            lastDelimiterPosition = -99;
+            lastQuotePosition = -99;
+            lastDoubleQuotePosition = -99;
+            quoteCount = 0;
+            startedQuote = false;
         }
 
         public boolean nextRecord() throws IOException {
@@ -136,13 +173,32 @@
                                     return start < end;
                                 }
                                 p -= (s - start);
+                                lastQuotePosition -= (s - start);
+                                lastDoubleQuotePosition -= (s - start);
+                                lastDelimiterPosition -= (s - start);
                             }
                             char ch = buffer[p];
-                            if (ch == '\n') {
+                            // We perform rough format correctness (delimiter, quote) check here
+                            // to set the starting position of a record.
+                            // In the field level, more checking will be conducted.
+                            if (ch == quote) {
+                                startedQuote = true;
+                                // check two quotes in a row - "". This is an escaped quote
+                                if (lastQuotePosition == p - 1 && start != p - 1 && lastDoubleQuotePosition != p - 1) {
+                                    lastDoubleQuotePosition = p;
+                                }
+                                lastQuotePosition = p;
+                            } else if (ch == fieldDelimiter) {
+                                if (startedQuote && lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1) {
+                                    startedQuote = false;
+                                    lastDelimiterPosition = p;
+                                }
+                            } else if (ch == '\n' && !startedQuote) {
                                 start = p + 1;
                                 state = State.EOR;
+                                lastDelimiterPosition = p;
                                 break;
-                            } else if (ch == '\r') {
+                            } else if (ch == '\r' && !startedQuote) {
                                 start = p + 1;
                                 state = State.CR;
                                 break;
@@ -160,7 +216,7 @@
                             }
                         }
                         char ch = buffer[start];
-                        if (ch == '\n') {
+                        if (ch == '\n' && !startedQuote) {
                             ++start;
                             state = State.EOR;
                         } else {
@@ -177,6 +233,7 @@
                             }
                         }
                         state = State.IN_RECORD;
+                        lastDelimiterPosition = start;
                         return start < end;
 
                     case EOF:
@@ -195,35 +252,135 @@
 
                 case IN_RECORD:
                     boolean eof;
+                    // reset quote related values
+                    startedQuote = false;
+                    isDoubleQuoteIncludedInThisField = false;
+                    lastQuotePosition = -99;
+                    lastDoubleQuotePosition = -99;
+                    quoteCount = 0;
+                    doubleQuoteCount = 0;
+
                     int p = start;
                     while (true) {
                         if (p >= end) {
                             int s = start;
                             eof = !readMore();
+                            p -= (s - start);
+                            lastQuotePosition -= (s - start);
+                            lastDoubleQuotePosition -= (s - start);
+                            lastDelimiterPosition -= (s - start);
                             if (eof) {
                                 state = State.EOF;
+                                if (startedQuote && lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
+                                        && quoteCount == doubleQuoteCount * 2 + 2) {
+                                    // set the position of fStart to +1, fEnd to -1 to remove quote character
+                                    fStart = start + 1;
+                                    fEnd = p - 1;
+                                } else {
+                                    fStart = start;
+                                    fEnd = p;
+                                }
                                 return true;
                             }
-                            p -= (s - start);
                         }
                         char ch = buffer[p];
-                        if (ch == fieldDelimiter) {
-                            fStart = start;
-                            fEnd = p;
-                            start = p + 1;
-                            return true;
+                        if (ch == quote) {
+                            // If this is first quote in the field, then it needs to be placed in the beginning.
+                            if (!startedQuote) {
+                                if (lastDelimiterPosition == p - 1 || lastDelimiterPosition == -99) {
+                                    startedQuote = true;
+                                } else {
+                                    // In this case, we don't have a quote in the beginning of a field.
+                                    throw new IOException(
+                                            "At line: "
+                                                    + lineCount
+                                                    + ", field#: "
+                                                    + (fieldCount+1)
+                                                    + " - a quote enclosing a field needs to be placed in the beginning of that field.");
+                                }
+                            }
+                            // Check double quotes - "". We check [start != p-2]
+                            // to avoid false positive where there is no value in a field,
+                            // since it looks like a double quote. However, it's not a double quote.
+                            // (e.g. if field2 has no value:
+                            //       field1,"",field3 ... )
+                            if (lastQuotePosition == p - 1 && lastDelimiterPosition != p - 2
+                                    && lastDoubleQuotePosition != p - 1) {
+                                isDoubleQuoteIncludedInThisField = true;
+                                doubleQuoteCount++;
+                                lastDoubleQuotePosition = p;
+                            }
+                            lastQuotePosition = p;
+                            quoteCount++;
+                        } else if (ch == fieldDelimiter) {
+                            // If there was no quote in the field,
+                            // then we assume that the field contains a valid string.
+                            if (!startedQuote) {
+                                fStart = start;
+                                fEnd = p;
+                                start = p + 1;
+                                lastDelimiterPosition = p;
+                                return true;
+                            } else if (startedQuote) {
+                                if (lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1) {
+                                    // There is a quote right before the delimiter (e.g. ",)  and it is not two quote,
+                                    // then the field contains a valid string.
+                                    // We set the position of fStart to +1, fEnd to -1 to remove quote character
+                                    fStart = start + 1;
+                                    fEnd = p - 1;
+                                    start = p + 1;
+                                    lastDelimiterPosition = p;
+                                    return true;
+                                } else if (lastQuotePosition < p - 1 && lastQuotePosition != lastDoubleQuotePosition
+                                        && quoteCount == doubleQuoteCount * 2 + 2) {
+                                    // There is a quote before the delimiter, however it is not directly placed before the delimiter.
+                                    // In this case, we throw an exception.
+                                    // quoteCount == doubleQuoteCount * 2 + 2 : only true when we have two quotes except double-quotes.
+                                    throw new IOException("At line: " + lineCount + ", field#: " + (fieldCount+1)
+                                            + " -  A quote enclosing a field needs to be followed by the delimiter.");
+                                }
+                            }
+                            // If the control flow reaches here: we have a delimiter in this field and
+                            // there should be a quote in the beginning and the end of
+                            // this field. So, just continue reading next character
                         } else if (ch == '\n') {
-                            fStart = start;
-                            fEnd = p;
-                            start = p + 1;
-                            state = State.EOR;
-                            return true;
+                            if (!startedQuote) {
+                                fStart = start;
+                                fEnd = p;
+                                start = p + 1;
+                                state = State.EOR;
+                                lineCount++;
+                                lastDelimiterPosition = p;
+                                return true;
+                            } else if (startedQuote && lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
+                                    && quoteCount == doubleQuoteCount * 2 + 2) {
+                                // set the position of fStart to +1, fEnd to -1 to remove quote character
+                                fStart = start + 1;
+                                fEnd = p - 1;
+                                lastDelimiterPosition = p;
+                                start = p + 1;
+                                state = State.EOR;
+                                lineCount++;
+                                return true;
+                            }
                         } else if (ch == '\r') {
-                            fStart = start;
-                            fEnd = p;
-                            start = p + 1;
-                            state = State.CR;
-                            return true;
+                            if (!startedQuote) {
+                                fStart = start;
+                                fEnd = p;
+                                start = p + 1;
+                                state = State.CR;
+                                lastDelimiterPosition = p;
+                                return true;
+                            } else if (startedQuote && lastQuotePosition == p - 1 && lastDoubleQuotePosition != p - 1
+                                    && quoteCount == doubleQuoteCount * 2 + 2) {
+                                // set the position of fStart to +1, fEnd to -1 to remove quote character
+                                fStart = start + 1;
+                                fEnd = p - 1;
+                                lastDelimiterPosition = p;
+                                start = p + 1;
+                                state = State.CR;
+                                return true;
+                            }
                         }
                         ++p;
                     }
@@ -231,7 +388,16 @@
             throw new IllegalStateException();
         }
 
-        private boolean readMore() throws IOException {
+        protected void resetQuoteRelatedValue() {
+            startedQuote = false;
+            isDoubleQuoteIncludedInThisField = false;
+            lastQuotePosition = -99;
+            lastDoubleQuotePosition = -99;
+            quoteCount = 0;
+            doubleQuoteCount = 0;
+        }
+
+        protected boolean readMore() throws IOException {
             if (start > 0) {
                 System.arraycopy(buffer, start, buffer, 0, end - start);
             }
@@ -249,5 +415,29 @@
             end += n;
             return true;
         }
+
+    }
+
+    // Eliminate escaped double quotes("") in a field
+    protected void eliminateDoulbleQuote(char[] buffer, int start, int length) {
+        int lastDoubleQuotePosition = -99;
+        int writepos = start;
+        int readpos = start;
+        // Find positions where double quotes appear
+        for (int i = 0; i < length; i++) {
+            // Skip double quotes
+            if (buffer[readpos] == quote && lastDoubleQuotePosition != readpos - 1) {
+                lastDoubleQuotePosition = readpos;
+                readpos++;
+            } else {
+                // Moving characters except double quote to the front
+                if (writepos != readpos) {
+                    buffer[writepos] = buffer[readpos];
+                }
+                writepos++;
+                readpos++;
+            }
+        }
     }
 }
+