Fix ADM Parser Bug

This change make sure that each adm parser instance use its own objects for parsing
and not share it with other parser running on the same NC.

Change-Id: Ib54dd2f9f8474ddb8dc2d785f819dd62c7ce7ca3
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/73
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <westmann@gmail.com>
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/ADMDataParser.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/ADMDataParser.java
index 012c5e9..17fb427 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/ADMDataParser.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/ADMDataParser.java
@@ -93,17 +93,17 @@
         public ParseException(String message, Throwable cause) {
             super(message, cause);
         }
-        
+
         public ParseException(Throwable cause, int line, int column) {
             super(cause);
             setLocation(line, column);
         }
-        
+
         public void setLocation(int line, int column) {
             this.line = line;
             this.column = column;
         }
-        
+
         public String getMessage() {
             StringBuilder msg = new StringBuilder("Parse error");
             if (line >= 0) {
@@ -116,12 +116,12 @@
             return msg.append(": " + super.getMessage()).toString();
         }
     }
-    
+
     @Override
     public boolean parse(DataOutput out) throws AsterixException {
         try {
             return parseAdmInstance((IAType) recordType, datasetRec, out);
-        } catch (IOException e) {            
+        } catch (IOException e) {
             throw new ParseException(e, admLexer.getLine(), admLexer.getColumn());
         } catch (AdmLexerException e) {
             throw new AsterixException(e);
@@ -726,8 +726,7 @@
             IOException {
         final ATypeTag targetTypeTag = getTargetTypeTag(typeTag, objectType);
         if (targetTypeTag == null || !parseValue(admLexer.getLastTokenImage(), targetTypeTag, out)) {
-            throw new ParseException(mismatchErrorMessage + objectType.getTypeName() + mismatchErrorMessage2
-                    + typeTag);
+            throw new ParseException(mismatchErrorMessage + objectType.getTypeName() + mismatchErrorMessage2 + typeTag);
         }
     }
 
@@ -741,8 +740,7 @@
         }
 
         if (targetTypeTag == null || !parseValue(admLexer.getLastTokenImage(), typeTag, dataOutput)) {
-            throw new ParseException(mismatchErrorMessage + objectType.getTypeName() + mismatchErrorMessage2
-                    + typeTag);
+            throw new ParseException(mismatchErrorMessage + objectType.getTypeName() + mismatchErrorMessage2 + typeTag);
         }
 
         if (targetTypeTag != typeTag) {
@@ -824,37 +822,37 @@
                 stringSerde.serialize(aString, out);
                 return true;
             case TIME:
-                ATimeSerializerDeserializer.parse(unquoted, out);
+                parseTime(unquoted, out);
                 return true;
             case DATE:
-                ADateSerializerDeserializer.parse(unquoted, out);
+                parseDate(unquoted, out);
                 return true;
             case DATETIME:
-                ADateTimeSerializerDeserializer.parse(unquoted, out);
+                parseDateTime(unquoted, out);
                 return true;
             case DURATION:
-                ADurationSerializerDeserializer.parse(unquoted, out);
+                parseDuration(unquoted, out);
                 return true;
             case DAYTIMEDURATION:
-                ADayTimeDurationSerializerDeserializer.INSTANCE.parse(unquoted, out);
+                parseDateTimeDuration(unquoted, out);
                 return true;
             case YEARMONTHDURATION:
-                AYearMonthDurationSerializerDeserializer.INSTANCE.parse(unquoted, out);
+                parseYearMonthDuration(unquoted, out);
                 return true;
             case POINT:
-                APointSerializerDeserializer.parse(unquoted, out);
+                parsePoint(unquoted, out);
                 return true;
             case POINT3D:
-                APoint3DSerializerDeserializer.parse(unquoted, out);
+                parse3DPoint(unquoted, out);
                 return true;
             case CIRCLE:
-                ACircleSerializerDeserializer.parse(unquoted, out);
+                parseCircle(unquoted, out);
                 return true;
             case RECTANGLE:
-                ARectangleSerializerDeserializer.parse(unquoted, out);
+                parseRectangle(unquoted, out);
                 return true;
             case LINE:
-                ALineSerializerDeserializer.parse(unquoted, out);
+                parseLine(unquoted, out);
                 return true;
             case POLYGON:
                 APolygonSerializerDeserializer.parse(unquoted, out);
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractDataParser.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractDataParser.java
index 34bc1f9..470b0e9 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractDataParser.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractDataParser.java
@@ -14,25 +14,56 @@
  */
 package edu.uci.ics.asterix.runtime.operators.file;
 
+import java.io.DataOutput;
+
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.om.base.ABoolean;
+import edu.uci.ics.asterix.om.base.ACircle;
+import edu.uci.ics.asterix.om.base.ADate;
+import edu.uci.ics.asterix.om.base.ADateTime;
+import edu.uci.ics.asterix.om.base.ADayTimeDuration;
 import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.ADuration;
 import edu.uci.ics.asterix.om.base.AFloat;
 import edu.uci.ics.asterix.om.base.AInt16;
 import edu.uci.ics.asterix.om.base.AInt32;
 import edu.uci.ics.asterix.om.base.AInt64;
 import edu.uci.ics.asterix.om.base.AInt8;
+import edu.uci.ics.asterix.om.base.ALine;
+import edu.uci.ics.asterix.om.base.AMutableCircle;
+import edu.uci.ics.asterix.om.base.AMutableDate;
+import edu.uci.ics.asterix.om.base.AMutableDateTime;
+import edu.uci.ics.asterix.om.base.AMutableDayTimeDuration;
 import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableDuration;
 import edu.uci.ics.asterix.om.base.AMutableFloat;
 import edu.uci.ics.asterix.om.base.AMutableInt16;
 import edu.uci.ics.asterix.om.base.AMutableInt32;
 import edu.uci.ics.asterix.om.base.AMutableInt64;
 import edu.uci.ics.asterix.om.base.AMutableInt8;
+import edu.uci.ics.asterix.om.base.AMutableLine;
+import edu.uci.ics.asterix.om.base.AMutablePoint;
+import edu.uci.ics.asterix.om.base.AMutablePoint3D;
+import edu.uci.ics.asterix.om.base.AMutableRectangle;
 import edu.uci.ics.asterix.om.base.AMutableString;
+import edu.uci.ics.asterix.om.base.AMutableTime;
+import edu.uci.ics.asterix.om.base.AMutableYearMonthDuration;
 import edu.uci.ics.asterix.om.base.ANull;
+import edu.uci.ics.asterix.om.base.APoint;
+import edu.uci.ics.asterix.om.base.APoint3D;
+import edu.uci.ics.asterix.om.base.ARectangle;
 import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.base.ATime;
+import edu.uci.ics.asterix.om.base.AYearMonthDuration;
+import edu.uci.ics.asterix.om.base.temporal.ADateParserFactory;
+import edu.uci.ics.asterix.om.base.temporal.ADurationParserFactory;
+import edu.uci.ics.asterix.om.base.temporal.ATimeParserFactory;
+import edu.uci.ics.asterix.om.base.temporal.GregorianCalendarSystem;
+import edu.uci.ics.asterix.om.base.temporal.ADurationParserFactory.ADurationParseOption;
 import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * Base class for data parsers. Includes the common set of definitions for
@@ -49,6 +80,20 @@
     protected AMutableString aString = new AMutableString("");
     protected AMutableString aStringFieldName = new AMutableString("");
 
+    // For temporal and spatial data types
+    protected AMutableTime aTime = new AMutableTime(0);
+    protected AMutableDateTime aDateTime = new AMutableDateTime(0L);
+    protected AMutableDuration aDuration = new AMutableDuration(0, 0);
+    protected AMutableDayTimeDuration aDayTimeDuration = new AMutableDayTimeDuration(0);
+    protected AMutableYearMonthDuration aYearMonthDuration = new AMutableYearMonthDuration(0);
+    protected AMutablePoint aPoint = new AMutablePoint(0, 0);
+    protected AMutablePoint3D aPoint3D = new AMutablePoint3D(0, 0, 0);
+    protected AMutableCircle aCircle = new AMutableCircle(null, 0);
+    protected AMutableRectangle aRectangle = new AMutableRectangle(null, null);
+    protected AMutablePoint aPoint2 = new AMutablePoint(0, 0);
+    protected AMutableLine aLine = new AMutableLine(null, null);
+    protected AMutableDate aDate = new AMutableDate(0);
+
     // Serializers
     @SuppressWarnings("unchecked")
     protected ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
@@ -78,4 +123,187 @@
     protected ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(BuiltinType.ANULL);
 
+    // To avoid race conditions, the serdes for temporal and spatial data types needs to be one per parser
+    @SuppressWarnings("unchecked")
+    protected static final ISerializerDeserializer<ATime> timeSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ATIME);
+    @SuppressWarnings("unchecked")
+    protected static final ISerializerDeserializer<ADate> dateSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ADATE);
+    @SuppressWarnings("unchecked")
+    protected static final ISerializerDeserializer<ADateTime> datetimeSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ADATETIME);
+    @SuppressWarnings("unchecked")
+    protected static final ISerializerDeserializer<ADuration> durationSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ADURATION);
+    @SuppressWarnings("unchecked")
+    protected static final ISerializerDeserializer<ADayTimeDuration> dayTimeDurationSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ADAYTIMEDURATION);
+    @SuppressWarnings("unchecked")
+    protected static final ISerializerDeserializer<AYearMonthDuration> yearMonthDurationSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AYEARMONTHDURATION);
+    @SuppressWarnings("unchecked")
+    protected final static ISerializerDeserializer<APoint> pointSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.APOINT);
+    @SuppressWarnings("unchecked")
+    protected final static ISerializerDeserializer<APoint3D> point3DSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.APOINT3D);
+    @SuppressWarnings("unchecked")
+    protected final static ISerializerDeserializer<ACircle> circleSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ACIRCLE);
+    @SuppressWarnings("unchecked")
+    protected final static ISerializerDeserializer<ARectangle> rectangleSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ARECTANGLE);
+    @SuppressWarnings("unchecked")
+    protected final static ISerializerDeserializer<ALine> lineSerde = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.ALINE);
+
+    protected void parseTime(String time, DataOutput out) throws HyracksDataException {
+        int chrononTimeInMs;
+        try {
+            chrononTimeInMs = ATimeParserFactory.parseTimePart(time, 0, time.length());
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        aTime.setValue(chrononTimeInMs);
+        timeSerde.serialize(aTime, out);
+    }
+
+    protected void parseDate(String date, DataOutput out) throws HyracksDataException {
+        long chrononTimeInMs = 0;
+        try {
+            chrononTimeInMs = ADateParserFactory.parseDatePart(date, 0, date.length());
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        short temp = 0;
+        if (chrononTimeInMs < 0 && chrononTimeInMs % GregorianCalendarSystem.CHRONON_OF_DAY != 0) {
+            temp = 1;
+        }
+        aDate.setValue((int) (chrononTimeInMs / GregorianCalendarSystem.CHRONON_OF_DAY) - temp);
+        dateSerde.serialize(aDate, out);
+    }
+
+    protected void parseDateTime(String datetime, DataOutput out) throws HyracksDataException {
+        long chrononTimeInMs = 0;
+        try {
+            // +1 if it is negative (-)
+            short timeOffset = (short) ((datetime.charAt(0) == '-') ? 1 : 0);
+
+            timeOffset += 8;
+
+            if (datetime.charAt(timeOffset) != 'T') {
+                timeOffset += 2;
+                if (datetime.charAt(timeOffset) != 'T') {
+                    throw new AlgebricksException("This can not be an instance of datetime: missing T");
+                }
+            }
+            chrononTimeInMs = ADateParserFactory.parseDatePart(datetime, 0, timeOffset);
+            chrononTimeInMs += ATimeParserFactory.parseTimePart(datetime, timeOffset + 1, datetime.length()
+                    - timeOffset - 1);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        aDateTime.setValue(chrononTimeInMs);
+        datetimeSerde.serialize(aDateTime, out);
+    }
+
+    protected void parseDuration(String duration, DataOutput out) throws HyracksDataException {
+        try {
+            ADurationParserFactory.parseDuration(duration, 0, duration.length(), aDuration, ADurationParseOption.All);
+            durationSerde.serialize(aDuration, out);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    protected void parseDateTimeDuration(String durationString, DataOutput out) throws HyracksDataException {
+        try {
+            ADurationParserFactory.parseDuration(durationString, 0, durationString.length(), aDayTimeDuration,
+                    ADurationParseOption.All);
+            dayTimeDurationSerde.serialize(aDayTimeDuration, out);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    protected void parseYearMonthDuration(String durationString, DataOutput out) throws HyracksDataException {
+        try {
+            ADurationParserFactory.parseDuration(durationString, 0, durationString.length(), aYearMonthDuration,
+                    ADurationParseOption.All);
+            yearMonthDurationSerde.serialize(aYearMonthDuration, out);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    protected void parsePoint(String point, DataOutput out) throws HyracksDataException {
+        try {
+            aPoint.setValue(Double.parseDouble(point.substring(0, point.indexOf(','))),
+                    Double.parseDouble(point.substring(point.indexOf(',') + 1, point.length())));
+            pointSerde.serialize(aPoint, out);
+        } catch (HyracksDataException e) {
+            throw new HyracksDataException(point + " can not be an instance of point");
+        }
+    }
+
+    protected void parse3DPoint(String point3d, DataOutput out) throws HyracksDataException {
+        try {
+            int firstCommaIndex = point3d.indexOf(',');
+            int secondCommaIndex = point3d.indexOf(',', firstCommaIndex + 1);
+            aPoint3D.setValue(Double.parseDouble(point3d.substring(0, firstCommaIndex)),
+                    Double.parseDouble(point3d.substring(firstCommaIndex + 1, secondCommaIndex)),
+                    Double.parseDouble(point3d.substring(secondCommaIndex + 1, point3d.length())));
+            point3DSerde.serialize(aPoint3D, out);
+        } catch (HyracksDataException e) {
+            throw new HyracksDataException(point3d + " can not be an instance of point3d");
+        }
+    }
+
+    protected void parseCircle(String circle, DataOutput out) throws HyracksDataException {
+        try {
+            String[] parts = circle.split(" ");
+            aPoint.setValue(Double.parseDouble(parts[0].split(",")[0]), Double.parseDouble(parts[0].split(",")[1]));
+            aCircle.setValue(aPoint, Double.parseDouble(parts[1].substring(0, parts[1].length())));
+            circleSerde.serialize(aCircle, out);
+        } catch (HyracksDataException e) {
+            throw new HyracksDataException(circle + " can not be an instance of circle");
+        }
+    }
+
+    protected void parseRectangle(String rectangle, DataOutput out) throws HyracksDataException {
+        try {
+            String[] points = rectangle.split(" ");
+            if (points.length != 2)
+                throw new HyracksDataException("rectangle consists of only 2 points.");
+            aPoint.setValue(Double.parseDouble(points[0].split(",")[0]), Double.parseDouble(points[0].split(",")[1]));
+            aPoint2.setValue(Double.parseDouble(points[1].split(",")[0]), Double.parseDouble(points[1].split(",")[1]));
+            if (aPoint.getX() > aPoint2.getX() && aPoint.getY() > aPoint2.getY()) {
+                aRectangle.setValue(aPoint2, aPoint);
+            } else if (aPoint.getX() < aPoint2.getX() && aPoint.getY() < aPoint2.getY()) {
+                aRectangle.setValue(aPoint, aPoint2);
+            } else {
+                throw new IllegalArgumentException(
+                        "Rectangle arugment must be either (bottom left point, top right point) or (top right point, bottom left point)");
+            }
+            rectangleSerde.serialize(aRectangle, out);
+        } catch (HyracksDataException e) {
+            throw new HyracksDataException(rectangle + " can not be an instance of rectangle");
+        }
+    }
+
+    protected void parseLine(String line, DataOutput out) throws HyracksDataException {
+        try {
+            String[] points = line.split(" ");
+            if (points.length != 2)
+                throw new HyracksDataException("line consists of only 2 points.");
+            aPoint.setValue(Double.parseDouble(points[0].split(",")[0]), Double.parseDouble(points[0].split(",")[1]));
+            aPoint2.setValue(Double.parseDouble(points[1].split(",")[0]), Double.parseDouble(points[1].split(",")[1]));
+            aLine.setValue(aPoint, aPoint2);
+            lineSerde.serialize(aLine, out);
+        } catch (HyracksDataException e) {
+            throw new HyracksDataException(line + " can not be an instance of line");
+        }
+    }
+
 }