[ASTERIXDB-3503][EXT] Add optional parsing parameters for delta

- user model changes: yes
- storage format changes: no
- interface changes: no

Details:
- Add the following optional parsing parameters for delta:
   -- timestamp-to-long: parse date as int; otherwise as ADateTime
   -- date-to-int: parse date as int; otherwise as ADate
   -- decimal-to-double
   -- timezone

Ext-ref: MB-63840

Change-Id: I6fa3d46588116508716fc1abd693f75ee5538d7f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19068
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
new file mode 100644
index 0000000..81e465c
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta.converter;
+
+import java.io.DataOutput;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ADate;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.AMutableDate;
+import org.apache.asterix.om.base.AMutableDateTime;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class DeltaConverterContext extends ParserContext {
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<ADate> dateSerDer =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATE);
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<ADateTime> datetimeSerDer =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME);
+    private final boolean decimalToDouble;
+    private final boolean timestampAsLong;
+    private final boolean dateAsInt;
+
+    private final int timeZoneOffset;
+    private final AMutableDate mutableDate = new AMutableDate(0);
+    private final AMutableDateTime mutableDateTime = new AMutableDateTime(0);
+
+    public DeltaConverterContext(Map<String, String> configuration) {
+        decimalToDouble = Boolean.parseBoolean(configuration
+                .getOrDefault(ExternalDataConstants.DeltaOptions.DECIMAL_TO_DOUBLE, ExternalDataConstants.FALSE));
+        timestampAsLong = Boolean.parseBoolean(configuration
+                .getOrDefault(ExternalDataConstants.DeltaOptions.TIMESTAMP_AS_LONG, ExternalDataConstants.TRUE));
+        dateAsInt = Boolean.parseBoolean(
+                configuration.getOrDefault(ExternalDataConstants.DeltaOptions.DATE_AS_INT, ExternalDataConstants.TRUE));
+        String configuredTimeZoneId = configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE);
+        if (configuredTimeZoneId != null && !configuredTimeZoneId.isEmpty()) {
+            timeZoneOffset = TimeZone.getTimeZone(configuredTimeZoneId).getRawOffset();
+        } else {
+            timeZoneOffset = 0;
+        }
+    }
+
+    public void serializeDate(int value, DataOutput output) {
+        try {
+            mutableDate.setValue(value);
+            dateSerDer.serialize(mutableDate, output);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void serializeDateTime(long timestamp, DataOutput output) {
+        try {
+            mutableDateTime.setValue(timestamp);
+            datetimeSerDer.serialize(mutableDateTime, output);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public boolean isDecimalToDoubleEnabled() {
+        return decimalToDouble;
+    }
+
+    public int getTimeZoneOffset() {
+        return timeZoneOffset;
+    }
+
+    public boolean isTimestampAsLong() {
+        return timestampAsLong;
+    }
+
+    public boolean isDateAsInt() {
+        return dateAsInt;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
index e56be86..ea02d77 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
@@ -18,12 +18,12 @@
  */
 package org.apache.asterix.external.parser;
 
-import static org.apache.avro.Schema.Type.NULL;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.DataOutput;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.builders.IARecordBuilder;
@@ -34,7 +34,8 @@
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
-import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.om.base.ABoolean;
 import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
@@ -63,11 +64,11 @@
 import io.delta.kernel.types.TimestampType;
 
 public class DeltaDataParser extends AbstractDataParser implements IRecordDataParser<Row> {
-    private final ParserContext parserContext;
+    private final DeltaConverterContext parserContext;
     private final IExternalFilterValueEmbedder valueEmbedder;
 
-    public DeltaDataParser(IExternalDataRuntimeContext context) {
-        parserContext = new ParserContext();
+    public DeltaDataParser(IExternalDataRuntimeContext context, Map<String, String> conf) {
+        parserContext = new DeltaConverterContext(conf);
         valueEmbedder = context.getValueEmbedder();
     }
 
@@ -160,7 +161,6 @@
         if (isNull) {
             return ATypeTag.NULL;
         }
-
         if (schema instanceof BooleanType) {
             return ATypeTag.BOOLEAN;
         } else if (schema instanceof ShortType || schema instanceof IntegerType || schema instanceof LongType) {
@@ -170,15 +170,24 @@
         } else if (schema instanceof StringType) {
             return ATypeTag.STRING;
         } else if (schema instanceof DateType) {
-            return ATypeTag.BIGINT;
+            if (parserContext.isDateAsInt()) {
+                return ATypeTag.INTEGER;
+            }
+            return ATypeTag.DATE;
         } else if (schema instanceof TimestampType || schema instanceof TimestampNTZType) {
-            return ATypeTag.BIGINT;
+            if (parserContext.isTimestampAsLong()) {
+                return ATypeTag.BIGINT;
+            }
+            return ATypeTag.DATETIME;
         } else if (schema instanceof BinaryType) {
             return ATypeTag.BINARY;
         } else if (schema instanceof ArrayType) {
             return ATypeTag.ARRAY;
         } else if (schema instanceof StructType) {
             return ATypeTag.OBJECT;
+        } else if (schema instanceof DecimalType) {
+            ensureDecimalToDoubleEnabled(schema, parserContext);
+            return ATypeTag.DOUBLE;
         } else {
             throw createUnsupportedException(schema);
         }
@@ -204,11 +213,26 @@
         } else if (schema instanceof StringType) {
             serializeString(row.getString(index), out);
         } else if (schema instanceof DateType) {
-            serializeDate(row.getInt(index), out);
+            if (parserContext.isDateAsInt()) {
+                serializeLong(row.getInt(index), out);
+            } else {
+                parserContext.serializeDate(row.getInt(index), out);
+            }
         } else if (schema instanceof TimestampType) {
-            serializeTimestamp(row.getLong(index), out);
+            long timeStampInMillis = TimeUnit.MICROSECONDS.toMillis(row.getLong(index));
+            int offset = parserContext.getTimeZoneOffset();
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(timeStampInMillis + offset, out);
+            } else {
+                parserContext.serializeDateTime(timeStampInMillis + offset, out);
+            }
         } else if (schema instanceof TimestampNTZType) {
-            serializeTimestamp(row.getLong(index), out);
+            long timeStampInMillis = TimeUnit.MICROSECONDS.toMillis(row.getLong(index));
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(timeStampInMillis, out);
+            } else {
+                parserContext.serializeDateTime(timeStampInMillis, out);
+            }
         } else if (schema instanceof StructType) {
             parseObject(row.getStruct(index), out);
         } else if (schema instanceof ArrayType) {
@@ -240,11 +264,26 @@
         } else if (schema instanceof StringType) {
             serializeString(column.getString(index), out);
         } else if (schema instanceof DateType) {
-            serializeDate(column.getInt(index), out);
+            if (parserContext.isDateAsInt()) {
+                serializeLong(column.getInt(index), out);
+            } else {
+                parserContext.serializeDate(column.getInt(index), out);
+            }
         } else if (schema instanceof TimestampType) {
-            serializeTimestamp(column.getLong(index), out);
+            long timeStampInMillis = TimeUnit.MICROSECONDS.toMillis(column.getLong(index));
+            int offset = parserContext.getTimeZoneOffset();
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(timeStampInMillis + offset, out);
+            } else {
+                parserContext.serializeDateTime(timeStampInMillis + offset, out);
+            }
         } else if (schema instanceof TimestampNTZType) {
-            serializeTimestamp(column.getLong(index), out);
+            long timeStampInMillis = TimeUnit.MICROSECONDS.toMillis(column.getLong(index));
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(timeStampInMillis, out);
+            } else {
+                parserContext.serializeDateTime(timeStampInMillis, out);
+            }
         } else if (schema instanceof ArrayType) {
             parseArray((ArrayType) schema, column.getArray(index), out);
         } else if (schema instanceof StructType) {
@@ -273,16 +312,6 @@
         stringSerde.serialize(aString, out);
     }
 
-    private void serializeDate(Object value, DataOutput out) throws HyracksDataException {
-        aInt32.setValue((Integer) value);
-        int32Serde.serialize(aInt32, out);
-    }
-
-    private void serializeTimestamp(Object value, DataOutput out) throws HyracksDataException {
-        aInt64.setValue(TimeUnit.MICROSECONDS.toMillis((Long) value));
-        int64Serde.serialize(aInt64, out);
-    }
-
     private void serializeDecimal(BigDecimal value, DataOutput out) throws HyracksDataException {
         serializeDouble(value.doubleValue(), out);
     }
@@ -290,4 +319,12 @@
     private static HyracksDataException createUnsupportedException(DataType schema) {
         return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Delta Parser", schema.toString());
     }
+
+    private static void ensureDecimalToDoubleEnabled(DataType type, DeltaConverterContext context)
+            throws RuntimeDataException {
+        if (!context.isDecimalToDoubleEnabled()) {
+            throw new RuntimeDataException(ErrorCode.PARQUET_SUPPORTED_TYPE_WITH_OPTION, type.toString(),
+                    ExternalDataConstants.ParquetOptions.DECIMAL_TO_DOUBLE);
+        }
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
index 5d4b2dd..7f28105 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
@@ -60,7 +60,6 @@
     }
 
     private DeltaDataParser createParser(IExternalDataRuntimeContext context) {
-        return new DeltaDataParser(context);
+        return new DeltaDataParser(context, configuration);
     }
-
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index d8f89c2..ffe75cb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.external.util;
 
 import java.util.Set;
-import java.util.TimeZone;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
@@ -328,6 +327,16 @@
         WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP);
     }
 
+    public static class DeltaOptions {
+        private DeltaOptions() {
+        }
+
+        public static final String DECIMAL_TO_DOUBLE = "decimal-to-double";
+        public static final String TIMESTAMP_AS_LONG = "timestamp-to-long";
+        public static final String DATE_AS_INT = "date-to-int";
+        public static final String TIMEZONE = "timezone";
+    }
+
     public static class ParquetOptions {
         private ParquetOptions() {
         }
@@ -357,10 +366,5 @@
          */
         public static final String TIMEZONE = "timezone";
         public static final String HADOOP_TIMEZONE = ASTERIX_HADOOP_PREFIX + TIMEZONE;
-
-        /**
-         * Valid time zones that are supported by Java
-         */
-        public static final Set<String> VALID_TIME_ZONES = Set.of(TimeZone.getAvailableIDs());
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index bf74079..bc5b8c3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -48,6 +48,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.TimeZone;
 import java.util.function.BiPredicate;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -109,6 +111,8 @@
 import org.apache.iceberg.io.CloseableIterable;
 
 public class ExternalDataUtils {
+
+    private static final Set<String> validTimeZones = Set.of(TimeZone.getAvailableIDs());
     private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
     private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
     private static final int HEADER_FUDGE = 64;
@@ -493,6 +497,11 @@
             throw new CompilationException(ErrorCode.INVALID_DELTA_TABLE_FORMAT,
                     configuration.get(ExternalDataConstants.KEY_FORMAT));
         }
+        if (configuration.containsKey(ExternalDataConstants.DeltaOptions.TIMEZONE)
+                && !validTimeZones.contains(configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE))) {
+            throw new CompilationException(ErrorCode.INVALID_TIMEZONE,
+                    configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE));
+        }
     }
 
     public static void prepareIcebergTableFormat(Map<String, String> configuration, Configuration conf,
@@ -928,7 +937,7 @@
             if (datasetRecordType.getFieldTypes().length != 0) {
                 throw new CompilationException(ErrorCode.UNSUPPORTED_TYPE_FOR_PARQUET, datasetRecordType.getTypeName());
             } else if (properties.containsKey(ParquetOptions.TIMEZONE)
-                    && !ParquetOptions.VALID_TIME_ZONES.contains(properties.get(ParquetOptions.TIMEZONE))) {
+                    && !validTimeZones.contains(properties.get(ParquetOptions.TIMEZONE))) {
                 //Ensure the configured time zone id is correct
                 throw new CompilationException(ErrorCode.INVALID_TIMEZONE, properties.get(ParquetOptions.TIMEZONE));
             }