[NO ISSUE]: Improve error handling

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- Throw errors relevant to the issue instead of an internal error
- Add valueEmbedder to HDFSInputStream
- Add a codec to read .gzip files as GzipCodec

Change-Id: Icdb0b51cf7444de4074e22b78f02d3b29b07414f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19130
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.01.ddl.sqlpp
index b68c38b..a384802 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.01.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.01.ddl.sqlpp
@@ -24,13 +24,10 @@
 CREATE TYPE OpenType AS {
 };
 
-CREATE EXTERNAL DATASET Customer(OpenType) USING S3 (
-    ("accessKeyId"="dummyAccessKey"),
-    ("secretAccessKey"="dummySecretKey"),
-    ("region"="us-west-2"),
-    ("serviceEndpoint"="http://127.0.0.1:8001"),
-    ("container"="playground"),
-    ("definition"="external-filter/car/{company:string}/customer/{customer_id:int}"),
+CREATE EXTERNAL DATASET Customer(OpenType) USING %adapter% (
+    %template%,
+    %additional_Properties%,
+    ("definition"="%path_prefix%external-filter/car/{company:string}/customer/{customer_id:int}"),
     ("embed-filter-values" = "false"),
     ("format"="json")
 );
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.02.update.sqlpp
index f1a22d0..a9a7a8f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.02.update.sqlpp
@@ -20,18 +20,15 @@
 USE test;
 
 COPY Customer c
-TO S3
-PATH ("copy-to-result", "car", company, "customer", customer_id)
+TO %adapter%
+PATH (%pathprefix% "copy-to-result", "car", company, "customer", customer_id)
 OVER (
    PARTITION BY c.company company,
                 c.customer_id customer_id
 )
 WITH {
-    "accessKeyId":"dummyAccessKey",
-    "secretAccessKey":"dummySecretKey",
-    "region":"us-west-2",
-    "serviceEndpoint":"http://127.0.0.1:8001",
-    "container":"playground",
+    %template_colons%,
+    %additionalProperties%
     "format":"json",
     "compression":"gzip"
 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.03.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.03.ddl.sqlpp
index 14d1d92..f46ddf9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.03.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.03.ddl.sqlpp
@@ -19,13 +19,10 @@
 
 USE test;
 
-CREATE EXTERNAL DATASET CustomerCopy(OpenType) USING S3 (
-    ("accessKeyId"="dummyAccessKey"),
-    ("secretAccessKey"="dummySecretKey"),
-    ("region"="us-west-2"),
-    ("serviceEndpoint"="http://127.0.0.1:8001"),
-    ("container"="playground"),
-    ("definition"="copy-to-result/car/{company:string}/customer/{customer_id:int}"),
+CREATE EXTERNAL DATASET CustomerCopy(OpenType) USING %adapter% (
+    %template%,
+    %additional_Properties%,
+    ("definition"="%path_prefix%copy-to-result/car/{company:string}/customer/{customer_id:int}"),
     ("embed-filter-values" = "false"),
     ("format"="json")
 );
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 5ae2fc5..c8a6785 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -21,6 +21,11 @@
   <test-group name="copy-to">
     <test-case FilePath="copy-to">
       <compilation-unit name="partition">
+        <placeholder name="adapter" value="S3" />
+        <placeholder name="pathprefix" value="" />
+        <placeholder name="path_prefix" value="" />
+        <placeholder name="additionalProperties" value='"container":"playground",' />
+        <placeholder name="additional_Properties" value='("container"="playground")' />
         <output-dir compare="Text">partition</output-dir>
       </compilation-unit>
     </test-case>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
index a5af248..6851433 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
@@ -93,6 +93,16 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="copy-to">
+      <compilation-unit name="partition">
+        <placeholder name="adapter" value="HDFS" />
+        <placeholder name="pathprefix" value='"/playground", ' />
+        <placeholder name="path_prefix" value="/playground/" />
+        <placeholder name="additionalProperties" value="" />
+        <placeholder name="additional_Properties" value='("input-format" = "text-input-format")' />
+        <output-dir compare="Text">partition</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="copy-to">
       <compilation-unit name="simple-write">
         <placeholder name="adapter" value="HDFS" />
         <placeholder name="pathprefix" value='"/playground", ' />
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 135b93f..934ba1d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -73,6 +73,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.hdfs.dataflow.ConfFactory;
 import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
@@ -160,8 +161,10 @@
             }
         } catch (FileNotFoundException ex) {
             throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
-        } catch (InterruptedException | IOException ex) {
+        } catch (InterruptedException ex) {
             throw HyracksDataException.create(ex);
+        } catch (IOException ex) {
+            throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ExceptionUtils.getMessageOrToString(ex));
         }
     }
 
@@ -229,10 +232,11 @@
      * 1. when target files are not null, it generates a file aware input stream that validate
      * against the files
      * 2. if the data is binary, it returns a generic reader */
-    public AsterixInputStream createInputStream(IHyracksTaskContext ctx) throws HyracksDataException {
+    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, IExternalDataRuntimeContext context)
+            throws HyracksDataException {
         try {
             restoreConfig(ctx);
-            return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, ugi);
+            return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, ugi, context);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
@@ -307,7 +311,7 @@
         try {
             if (recordReaderClazz != null) {
                 StreamRecordReader streamReader = (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
-                streamReader.configure(ctx, createInputStream(ctx), configuration);
+                streamReader.configure(ctx, createInputStream(ctx, context), configuration);
                 return streamReader;
             }
             restoreConfig(ctx);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
index d508336..c1bad6c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java
@@ -24,9 +24,12 @@
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.input.record.reader.hdfs.EmptyRecordReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -49,12 +52,14 @@
     private JobConf conf;
     private int pos = 0;
     private UserGroupInformation ugi;
+    private IExternalFilterValueEmbedder valueEmbedder;
 
     @SuppressWarnings("unchecked")
     public HDFSInputStream(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName,
-            JobConf conf, Map<String, String> configuration, UserGroupInformation ugi)
-            throws IOException, AsterixException {
+            JobConf conf, Map<String, String> configuration, UserGroupInformation ugi,
+            IExternalDataRuntimeContext context) throws IOException, AsterixException {
         this.ugi = ugi;
+        this.valueEmbedder = context.getValueEmbedder();
         this.read = read;
         this.inputSplits = inputSplits;
         this.readSchedule = readSchedule;
@@ -169,6 +174,7 @@
     }
 
     private RecordReader<Object, Text> getRecordReader(int splitIndex) throws IOException {
+        valueEmbedder.setPath(getPath(inputSplits[splitIndex]));
         try {
             reader = ugi == null ? getReader(splitIndex)
                     : ugi.doAs((PrivilegedExceptionAction<RecordReader<Object, Text>>) () -> getReader(splitIndex));
@@ -187,4 +193,12 @@
     private RecordReader<Object, Text> getReader(int splitIndex) throws IOException {
         return (RecordReader<Object, Text>) inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL);
     }
+
+    private String getPath(InputSplit split) {
+        if (split instanceof FileSplit) {
+            return ((FileSplit) split).getPath().toString();
+        } else {
+            return split.toString();
+        }
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 1de2cd2..d487e68 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -179,6 +179,7 @@
     public static final String CLASS_NAME_HDFS_FILESYSTEM = "org.apache.hadoop.hdfs.DistributedFileSystem";
     public static final String S3A_CHANGE_DETECTION_REQUIRED = "requireVersionChangeDetection";
     public static final String S3A_CHANGE_DETECTION_REQUIRED_CONFIG_KEY = "fs.s3a.change.detection.version.required";
+    public static final String HDFS_IO_COMPRESSION_CODECS_KEY = "io.compression.codecs";
     /**
      * input formats aliases
      */
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index f06638d..4d093f5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -1134,8 +1134,7 @@
                 protocol = nodePathPair[0];
                 break;
             case ExternalDataConstants.KEY_ADAPTER_NAME_HDFS:
-                protocol = ExternalDataConstants.KEY_HDFS_URL;
-                break;
+                return configurations.get(ExternalDataConstants.KEY_HDFS_URL).replaceAll("/+$", "");
             default:
                 return "";
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 75d68ba..35f2a94 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -70,6 +70,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -262,6 +263,7 @@
         if (useDatanodeHostname != null) {
             conf.set(ExternalDataConstants.KEY_HDFS_USE_DATANODE_HOSTNAME, useDatanodeHostname);
         }
+        conf.set(ExternalDataConstants.HDFS_IO_COMPRESSION_CODECS_KEY, AliasGzipCodec.class.getName());
         return conf;
     }
 
@@ -547,6 +549,9 @@
 
     public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
             IWarningCollector collector) throws CompilationException {
+        if (configuration.get(ExternalDataConstants.KEY_HDFS_URL) == null) {
+            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_HDFS_URL);
+        }
         if (configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT) == null) {
             throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc,
                     ExternalDataConstants.KEY_INPUT_FORMAT);
@@ -590,4 +595,11 @@
         return ExternalDataConstants.KEY_ADAPTER_NAME_HDFS
                 .equalsIgnoreCase(configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE));
     }
+
+    public static class AliasGzipCodec extends GzipCodec {
+        @Override
+        public String getDefaultExtension() {
+            return "." + ExternalDataConstants.KEY_COMPRESSION_GZIP;
+        }
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
index 9203c8f..f06b8e9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.external.writer;
 
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -79,7 +81,7 @@
                     }
                 }
             } catch (IOException ex) {
-                throw HyracksDataException.create(ex);
+                throw RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR, ex, getMessageOrToString(ex));
             }
         }
     }
@@ -94,8 +96,8 @@
             printer.newStream(outputStream);
         } catch (FileAlreadyExistsException e) {
             return false;
-        } catch (IOException e) {
-            throw HyracksDataException.create(e);
+        } catch (IOException ex) {
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR, ex, getMessageOrToString(ex));
         }
         return true;
     }
@@ -113,7 +115,7 @@
                 fs.delete(path, false);
             }
         } catch (IOException ex) {
-            throw HyracksDataException.create(ex);
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR, ex, getMessageOrToString(ex));
         }
     }
 
@@ -125,7 +127,7 @@
                 fs.rename(path, new Path(path.getParent(), path.getName().substring(1)));
             }
         } catch (IOException ex) {
-            throw HyracksDataException.create(ex);
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR, ex, getMessageOrToString(ex));
         }
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
index a4113d1..7bb07bc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.external.writer;
 
-import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.IOException;
@@ -89,7 +88,7 @@
         } catch (InterruptedException ex) {
             throw HyracksDataException.create(ex);
         } catch (IOException ex) {
-            throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
+            throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, ex, getMessageOrToString(ex));
         }
     }
 
@@ -135,7 +134,7 @@
                 doValidate(testFs);
             }
         } catch (IOException ex) {
-            throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, ExceptionUtils.getMessageOrToString(ex));
+            throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, ExceptionUtils.getMessageOrToString(ex));
         }
     }
 
@@ -171,7 +170,7 @@
                 outputStream.write(0);
             }
         } catch (IOException ex) {
-            throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, ex, getMessageOrToString(ex));
+            throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, ex, getMessageOrToString(ex));
         }
     }
 }