Introduce ("header"="true") option for CSV parsing

Change-Id: I473bc7b66d0c0d1355b5fb3e392cb9ece151e50e
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/226
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
Tested-by: Chris Hillery <ceej@lambda.nu>
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/IParseFileSplitsDecl.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/IParseFileSplitsDecl.java
index 4a9dad5..1a2c972 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/IParseFileSplitsDecl.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/parse/IParseFileSplitsDecl.java
@@ -23,5 +23,7 @@
 
     public char getQuote();
 
+    public boolean getHasHeader();
+
     public FileSplit[] getSplits();
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
index 040f506..a11cc22 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
@@ -136,11 +136,12 @@
 
         char delimiter = StreamBasedAdapterFactory.getDelimiter(configuration);
         char quote = StreamBasedAdapterFactory.getQuote(configuration, delimiter);
+        boolean hasHeader = StreamBasedAdapterFactory.getHasHeader(configuration);
 
         parserFactory = new HDFSIndexingParserFactory((ARecordType) atype,
-                (String) configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT),
-                (String) configuration.get(KEY_FORMAT), delimiter, quote,
-                (String) configuration.get(HDFSAdapterFactory.KEY_PARSER));
+                                                      configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT),
+                                                      configuration.get(KEY_FORMAT), delimiter, quote, hasHeader,
+                                                      configuration.get(HDFSAdapterFactory.KEY_PARSER));
     }
 
     /**
@@ -153,7 +154,7 @@
      * @return
      */
     @SuppressWarnings("null")
-    public static DelimitedDataParser getDilimitedDataParser(ARecordType recordType, char delimiter, char quote) {
+    public static DelimitedDataParser getDelimitedDataParser(ARecordType recordType, char delimiter, char quote, boolean hasHeader) {
         int n = recordType.getFieldTypes().length;
         IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
         for (int i = 0; i < n; i++) {
@@ -176,7 +177,7 @@
             }
             fieldParserFactories[i] = vpf;
         }
-        return new DelimitedDataParser(recordType, fieldParserFactories, delimiter, quote, false, -1, null);
+        return new DelimitedDataParser(recordType, fieldParserFactories, delimiter, quote, hasHeader, false, -1, null);
     }
 
     public static AlgebricksPartitionConstraint getClusterLocations() {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
index 8daa7d0..c3a3746 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
@@ -52,6 +52,7 @@
     public static final String KEY_PARSER_FACTORY = "parser";
     public static final String KEY_DELIMITER = "delimiter";
     public static final String KEY_QUOTE = "quote";
+    public static final String KEY_HEADER = "header";
     public static final String KEY_PATH = "path";
     public static final String KEY_SOURCE_DATATYPE = "output-type-name";
     // The length of a delimiter should be 1.
@@ -109,10 +110,11 @@
 
         char delimiter = getDelimiter(configuration);
         char quote = getQuote(configuration, delimiter);
+        boolean hasHeader = getHasHeader(configuration);
 
         return conditionalPush ? new ConditionalPushTupleParserFactory(recordType, fieldParserFactories, delimiter,
-                quote, configuration) : new NtDelimitedDataTupleParserFactory(recordType, fieldParserFactories,
-                delimiter, quote, isPKAutoGenerated, primaryKeyPosition, origSourceDataTypeForAutoGeneratedPK);
+                quote, hasHeader, configuration) : new NtDelimitedDataTupleParserFactory(recordType, fieldParserFactories,
+                delimiter, quote, hasHeader, isPKAutoGenerated, primaryKeyPosition, origSourceDataTypeForAutoGeneratedPK);
     }
 
     protected ITupleParserFactory getADMDataTupleParserFactory(ARecordType recordType, boolean conditionalPush,
@@ -130,21 +132,21 @@
 
     protected void configureFormat(IAType sourceDatatype, boolean isPKAutoGenerated, int primaryKeyPosition,
             IAType origSourceDataTypeForAutoGeneratedPK) throws Exception {
-        String propValue = (String) configuration.get(BATCH_SIZE);
+        String propValue = configuration.get(BATCH_SIZE);
         int batchSize = propValue != null ? Integer.parseInt(propValue) : -1;
-        propValue = (String) configuration.get(BATCH_INTERVAL);
+        propValue = configuration.get(BATCH_INTERVAL);
         long batchInterval = propValue != null ? Long.parseLong(propValue) : -1;
         boolean conditionalPush = batchSize > 0 || batchInterval > 0;
 
-        String parserFactoryClassname = (String) configuration.get(KEY_PARSER_FACTORY);
+        String parserFactoryClassname = configuration.get(KEY_PARSER_FACTORY);
         if (parserFactoryClassname == null) {
-            String specifiedFormat = (String) configuration.get(KEY_FORMAT);
+            String specifiedFormat = configuration.get(KEY_FORMAT);
             if (specifiedFormat == null) {
                 throw new IllegalArgumentException(" Unspecified data format");
             } else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
                 parserFactory = getDelimitedDataTupleParserFactory((ARecordType) sourceDatatype, conditionalPush,
                         isPKAutoGenerated, primaryKeyPosition, (ARecordType) origSourceDataTypeForAutoGeneratedPK);
-            } else if (FORMAT_ADM.equalsIgnoreCase((String) configuration.get(KEY_FORMAT))) {
+            } else if (FORMAT_ADM.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
                 parserFactory = getADMDataTupleParserFactory((ARecordType) sourceDatatype, conditionalPush,
                         isPKAutoGenerated, primaryKeyPosition, (ARecordType) origSourceDataTypeForAutoGeneratedPK);
             } else {
@@ -158,7 +160,7 @@
 
     // Get a delimiter from the given configuration
     public static char getDelimiter(Map<String, String> configuration) throws AsterixException {
-        String delimiterValue = (String) configuration.get(KEY_DELIMITER);
+        String delimiterValue = configuration.get(KEY_DELIMITER);
         if (delimiterValue == null) {
             delimiterValue = DEFAULT_DELIMITER;
         } else if (delimiterValue.length() != 1) {
@@ -171,7 +173,7 @@
     // Get a quote from the given configuration when the delimiter is given
     // Need to pass delimiter to check whether they share the same character
     public static char getQuote(Map<String, String> configuration, char delimiter) throws AsterixException {
-        String quoteValue = (String) configuration.get(KEY_QUOTE);
+        String quoteValue = configuration.get(KEY_QUOTE);
         if (quoteValue == null) {
             quoteValue = DEFAULT_QUOTE;
         } else if (quoteValue.length() != 1) {
@@ -188,4 +190,8 @@
         return quoteValue.charAt(0);
     }
 
+    // Get the header flag
+    public static boolean getHasHeader(Map<String, String> configuration) {
+        return Boolean.parseBoolean(configuration.get(KEY_HEADER));
+    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
index 122f15a..6b2174f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
@@ -38,29 +38,32 @@
 
     private static final long serialVersionUID = 1L;
     // file input-format <text, seq, rc>
-    private String inputFormat;
+    private final String inputFormat;
     // content format <adm, delimited-text, binary>
-    private String format;
+    private final String format;
     // delimiter in case of delimited text
-    private char delimiter;
+    private final char delimiter;
     // quote in case of delimited text
-    private char quote;
+    private final char quote;
+    // whether delimited text file has a header (which should be ignored)
+    private final boolean hasHeader;
     // parser class name in case of binary format
-    private String parserClassName;
+    private final String parserClassName;
     // the expected data type
-    private ARecordType atype;
+    private final ARecordType atype;
     // the hadoop job conf
     private transient JobConf jobConf;
     // adapter arguments
     private Map<String, String> arguments;
 
     public HDFSIndexingParserFactory(ARecordType atype, String inputFormat, String format, char delimiter,
-            char quote, String parserClassName) {
+                                     char quote, boolean hasHeader, String parserClassName) {
         this.inputFormat = inputFormat;
         this.format = format;
         this.parserClassName = parserClassName;
         this.delimiter = delimiter;
         this.quote = quote;
+        this.hasHeader = hasHeader;
         this.atype = atype;
     }
 
@@ -90,8 +93,8 @@
             return new AdmOrDelimitedIndexingTupleParser(ctx, atype, dataParser);
         } else if (format.equalsIgnoreCase(StreamBasedAdapterFactory.FORMAT_DELIMITED_TEXT)) {
             // choice 3 with delimited data parser
-            DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDilimitedDataParser(atype,
-                    delimiter, quote);
+            DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser(atype,
+                delimiter, quote, hasHeader);
             return new AdmOrDelimitedIndexingTupleParser(ctx, atype, dataParser);
         }
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
index 4554228..13090626 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
@@ -100,9 +100,10 @@
             // create a delimited text parser
             char delimiter = StreamBasedAdapterFactory.getDelimiter(configuration);
             char quote = StreamBasedAdapterFactory.getQuote(configuration, delimiter);
+            boolean hasHeader = StreamBasedAdapterFactory.getHasHeader(configuration);
 
-            DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDilimitedDataParser((ARecordType) atype,
-                    delimiter, quote);
+            DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser((ARecordType) atype,
+                    delimiter, quote, hasHeader);
             if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)) {
                 // Text input format
                 TextFileLookupInputStream in = new TextFileLookupInputStream(fileIndexAccessor, jobConf);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
index 3cf8fe6..1114cc9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
@@ -47,7 +47,8 @@
                 dataParser = new ADMDataParser();
                 break;
             case DELIMITED_DATA:
-                dataParser = new DelimitedDataParser(recordType, valueParserFactories, delimiter, quote, false, -1, null);
+                dataParser = new DelimitedDataParser(recordType, valueParserFactories, delimiter, quote, hasHeader,
+                                                     false, -1, null);
                 break;
         }
         return new ConditionalPushTupleParser(ctx, recordType, dataParser, configuration);
@@ -58,6 +59,7 @@
     private IValueParserFactory[] valueParserFactories;
     private char delimiter;
     private char quote;
+    private boolean hasHeader;
     private final ParserType parserType;
 
     public enum ParserType {
@@ -66,14 +68,14 @@
     }
 
     public ConditionalPushTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
-            char fieldDelimiter, char quote, Map<String, String> configuration) {
+            char fieldDelimiter, char quote, boolean hasHeader, Map<String, String> configuration) {
         this.recordType = recordType;
         this.valueParserFactories = valueParserFactories;
         this.delimiter = fieldDelimiter;
         this.quote = quote;
+        this.hasHeader = hasHeader;
         this.configuration = configuration;
         this.parserType = ParserType.DELIMITED_DATA;
-
     }
 
     public ConditionalPushTupleParserFactory(ARecordType recordType, Map<String, String> configuration) {
@@ -81,7 +83,6 @@
         this.configuration = configuration;
         this.parserType = ParserType.ADM;
     }
-
 }
 
 class ConditionalPushTupleParser extends AbstractTupleParser {
@@ -214,4 +215,4 @@
 
     }
 
-}
\ No newline at end of file
+}
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/formats/base/IDataFormat.java b/asterix-om/src/main/java/edu/uci/ics/asterix/formats/base/IDataFormat.java
index c30cf3a..165b797 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/formats/base/IDataFormat.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/formats/base/IDataFormat.java
@@ -71,7 +71,7 @@
 
     public ITupleParserFactory createTupleParser(ARecordType recType, IParseFileSplitsDecl decl);
 
-    public ITupleParserFactory createTupleParser(ARecordType recType, boolean isDelimited, char delimiter, char quote);
+    public ITupleParserFactory createTupleParser(ARecordType recType, boolean isDelimited, char delimiter, char quote, boolean hasHeader);
 
     public IFunctionDescriptor resolveFunction(ILogicalExpression expr, IVariableTypeEnvironment typeEnvironment)
             throws AlgebricksException;
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index 65a4ea2..4ad3bf5 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -964,12 +964,12 @@
 
     @Override
     public ITupleParserFactory createTupleParser(ARecordType recType, IParseFileSplitsDecl decl) {
-        return createTupleParser(recType, decl.isDelimitedFileFormat(), decl.getDelimChar(), decl.getQuote());
+        return createTupleParser(recType, decl.isDelimitedFileFormat(), decl.getDelimChar(), decl.getQuote(), decl.getHasHeader());
     }
 
     @Override
     public ITupleParserFactory createTupleParser(ARecordType recType, boolean delimitedFormat, char delimiter,
-            char quote) {
+                                                 char quote, boolean hasHeader) {
         if (delimitedFormat) {
             int n = recType.getFieldTypes().length;
             IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
@@ -981,8 +981,8 @@
                 }
                 fieldParserFactories[i] = vpf;
             }
-            return new NtDelimitedDataTupleParserFactory(recType, fieldParserFactories, delimiter, quote, false, -1,
-                    null);
+            return new NtDelimitedDataTupleParserFactory(recType, fieldParserFactories, delimiter, quote, hasHeader,
+                                                         false, -1, null);
         } else {
             return new AdmSchemafullRecordParserFactory(recType, false, -1, null);
         }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataParser.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataParser.java
index 7a0ae4d..e160586 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataParser.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataParser.java
@@ -42,6 +42,7 @@
     protected final IValueParserFactory[] valueParserFactories;
     protected final char fieldDelimiter;
     protected final char quote;
+    protected final boolean hasHeader;
     protected final ARecordType recordType;
 
     private IARecordBuilder recBuilder;
@@ -55,8 +56,8 @@
     private byte AUUIDTag = ATypeTag.UUID.serialize();
 
     // Variables used to set a UUID for the auto-generated PK field
-    private boolean isPKAutoGenerated;
-    private int primaryKeyPosition;
+    private final boolean isPKAutoGenerated;
+    private final int primaryKeyPosition;
     private final ARecordType origRecordTypeForAutoGeneratedPK;
 
     private boolean areAllNullFields;
@@ -67,16 +68,18 @@
     private int fieldCount;
 
     public DelimitedDataParser(ARecordType recordType, IValueParserFactory[] valueParserFactories, char fieldDelimter,
-            char quote) {
-        this(recordType, valueParserFactories, fieldDelimter, quote, false, -1, null);
+                               char quote, boolean hasHeader) {
+        this(recordType, valueParserFactories, fieldDelimter, quote, hasHeader, false, -1, null);
     }
 
     public DelimitedDataParser(ARecordType recordType, IValueParserFactory[] valueParserFactories, char fieldDelimter,
-            char quote, boolean isPKAutoGenerated, int primaryKeyPosition, ARecordType origRecordTypeForAutoGeneratedPK) {
+                               char quote, boolean hasHeader, boolean isPKAutoGenerated, int primaryKeyPosition,
+                               ARecordType origRecordTypeForAutoGeneratedPK) {
         this.recordType = recordType;
         this.valueParserFactories = valueParserFactories;
         this.fieldDelimiter = fieldDelimter;
         this.quote = quote;
+        this.hasHeader = hasHeader;
         this.isPKAutoGenerated = isPKAutoGenerated;
         this.primaryKeyPosition = primaryKeyPosition;
         this.origRecordTypeForAutoGeneratedPK = origRecordTypeForAutoGeneratedPK;
@@ -134,11 +137,15 @@
         }
 
         cursor = new FieldCursor(new InputStreamReader(in));
-
     }
 
     @Override
     public boolean parse(DataOutput out) throws AsterixException, IOException {
+        if (hasHeader && lineCount == 1) {
+            // Consume all fields of first record
+            cursor.nextRecord();
+            while (cursor.nextField());
+        }
         while (cursor.nextRecord()) {
             // If PK is auto-generated, then we need to use the recordType that
             // includes PK, since recordType variable does not include PK field.
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataTupleParser.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataTupleParser.java
index 8846110..798b510 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataTupleParser.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataTupleParser.java
@@ -28,10 +28,12 @@
     private final DelimitedDataParser dataParser;
 
     public DelimitedDataTupleParser(IHyracksTaskContext ctx, ARecordType recType,
-            IValueParserFactory[] valueParserFactories, char fieldDelimter, char quote, boolean isPKAutoGenerated,
-            int primaryKeyPosition, ARecordType origRecordTypeForAutoGeneratedPK) throws HyracksDataException {
+                                    IValueParserFactory[] valueParserFactories,
+                                    char fieldDelimter, char quote, boolean hasHeader,
+                                    boolean isPKAutoGenerated, int primaryKeyPosition,
+                                    ARecordType origRecordTypeForAutoGeneratedPK) throws HyracksDataException {
         super(ctx, recType, isPKAutoGenerated, primaryKeyPosition, origRecordTypeForAutoGeneratedPK);
-        dataParser = new DelimitedDataParser(recType, valueParserFactories, fieldDelimter, quote, isPKAutoGenerated,
+        dataParser = new DelimitedDataParser(recType, valueParserFactories, fieldDelimter, quote, hasHeader, isPKAutoGenerated,
                 primaryKeyPosition, origRecordTypeForAutoGeneratedPK);
     }
 
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/NtDelimitedDataTupleParserFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/NtDelimitedDataTupleParserFactory.java
index a301c7d..137067d 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/NtDelimitedDataTupleParserFactory.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/NtDelimitedDataTupleParserFactory.java
@@ -27,28 +27,31 @@
  */
 public class NtDelimitedDataTupleParserFactory implements ITupleParserFactory {
     private static final long serialVersionUID = 1L;
-    protected ARecordType recordType;
+    protected final ARecordType recordType;
     protected IValueParserFactory[] valueParserFactories;
-    protected char fieldDelimiter;
+    protected final char fieldDelimiter;
     // quote is used to enclose a string if it includes delimiter(s) in it.
-    protected char quote;
+    protected final char quote;
+    // whether delimited text file has a header (which should be ignored)
+    protected final boolean hasHeader;
     // To deal with an auto-generated PK
-    protected boolean isPKAutoGenerated;
-    protected int primaryKeyPosition;
-    protected ARecordType origRecordTypeForAutoGeneratedPK;
+    protected final boolean isPKAutoGenerated;
+    protected final int primaryKeyPosition;
+    protected final ARecordType origRecordTypeForAutoGeneratedPK;
 
     public NtDelimitedDataTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
-            char fieldDelimiter, char quote) {
-        this(recordType, valueParserFactories, fieldDelimiter, quote, false, -1, null);
+                                             char fieldDelimiter, char quote, boolean hasHeader) {
+        this(recordType, valueParserFactories, fieldDelimiter, quote, hasHeader, false, -1, null);
     }
 
     public NtDelimitedDataTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
-            char fieldDelimiter, char quote, boolean isPKAutoGenerated, int primaryKeyposition,
+            char fieldDelimiter, char quote, boolean hasHeader, boolean isPKAutoGenerated, int primaryKeyposition,
             ARecordType origRecordTypeForAutoGeneratedPK) {
         this.recordType = recordType;
         this.valueParserFactories = valueParserFactories;
         this.fieldDelimiter = fieldDelimiter;
         this.quote = quote;
+        this.hasHeader = hasHeader;
         this.isPKAutoGenerated = isPKAutoGenerated;
         this.primaryKeyPosition = primaryKeyposition;
         this.origRecordTypeForAutoGeneratedPK = origRecordTypeForAutoGeneratedPK;
@@ -57,6 +60,6 @@
     @Override
     public ITupleParser createTupleParser(final IHyracksTaskContext ctx) throws HyracksDataException {
         return new DelimitedDataTupleParser(ctx, recordType, valueParserFactories, fieldDelimiter, quote,
-                isPKAutoGenerated, primaryKeyPosition, origRecordTypeForAutoGeneratedPK);
+            hasHeader, isPKAutoGenerated, primaryKeyPosition, origRecordTypeForAutoGeneratedPK);
     }
 }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index 5f30465..8f62320 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -137,9 +137,10 @@
             case FORMAT_DELIMITED_TEXT:
                 char delimiter = StreamBasedAdapterFactory.getDelimiter(configuration);
                 char quote = StreamBasedAdapterFactory.getQuote(configuration, delimiter);
+                boolean hasHeader = StreamBasedAdapterFactory.getHasHeader(configuration);
                 IValueParserFactory[] valueParserFactories = getValueParserFactories(atype);
                 parserFactory = new RateControlledTupleParserFactory(atype, valueParserFactories, delimiter, quote,
-                        configuration);
+                                                                     hasHeader, configuration);
                 break;
         }
     }
@@ -175,6 +176,7 @@
     private IValueParserFactory[] valueParserFactories;
     private char delimiter;
     private char quote;
+    private boolean hasHeader;
     private final ParserType parserType;
 
     public enum ParserType {
@@ -183,17 +185,12 @@
     }
 
     public RateControlledTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
-            char fieldDelimiter, Map<String, String> configuration) throws AsterixException {
-        this(recordType, valueParserFactories, fieldDelimiter, StreamBasedAdapterFactory.getQuote(configuration,
-                fieldDelimiter), configuration);
-    }
-
-    public RateControlledTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
-            char fieldDelimiter, char quote, Map<String, String> configuration) {
+            char fieldDelimiter, char quote, boolean hasHeader,  Map<String, String> configuration) {
         this.recordType = recordType;
         this.valueParserFactories = valueParserFactories;
         this.delimiter = fieldDelimiter;
         this.quote = quote;
+        this.hasHeader = hasHeader;
         this.configuration = configuration;
         this.parserType = ParserType.DELIMITED_DATA;
     }
@@ -212,7 +209,8 @@
                 dataParser = new ADMDataParser();
                 break;
             case DELIMITED_DATA:
-                dataParser = new DelimitedDataParser(recordType, valueParserFactories, delimiter, quote, false, -1,
+                dataParser = new DelimitedDataParser(recordType, valueParserFactories, delimiter,
+                                                     quote, hasHeader, false, -1,
                         null);
                 break;
         }