[NO ISSUE][EXT] Configurable Gzip compression level

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- Add an option "gzipCompressionLevel" to specify the
  compression level (1-9) to be used when gzip is used
  with COPY TO.
- Default gzip compression level to -1 (library default)
  when not specified.
- Make the compression buffer size equal to a frame size.

Change-Id: I6e80691e232269620d76e6b6f414cff6856f3232
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18204
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp
index 6df40c3..1d7253b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp
@@ -36,7 +36,8 @@
     "serviceEndpoint":"http://127.0.0.1:8001",
     "container":"playground",
     "format":"json",
-    "compression":"gzip"
+    "compression":"gzip",
+    "gzipCompressionLevel": "1"
 }
 
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index fc8f527..79252ad 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -304,6 +304,7 @@
      * Compression constants
      */
     public static final String KEY_COMPRESSION_GZIP = "gzip";
+    public static final String KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL = "gzipCompressionLevel";
 
     /**
      * Writer Constants
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index ae3b567..b02122d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -1085,4 +1085,8 @@
                 return ExternalDataConstants.KEY_PATH;
         }
     }
+
+    public static boolean isGzipCompression(String compression) {
+        return ExternalDataConstants.KEY_COMPRESSION_GZIP.equalsIgnoreCase(compression);
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
index 00fe855..b348c1e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.external.util;
 
+import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL;
+import static org.apache.asterix.external.util.ExternalDataConstants.KEY_FORMAT;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -59,6 +62,9 @@
         checkSupported(ExternalDataConstants.KEY_WRITER_COMPRESSION, compression,
                 ExternalDataConstants.WRITER_SUPPORTED_COMPRESSION, ErrorCode.UNKNOWN_COMPRESSION_SCHEME,
                 sourceLocation, true);
+        if (ExternalDataUtils.isGzipCompression(compression)) {
+            validateGzipCompressionLevel(configuration, sourceLocation);
+        }
     }
 
     private static void validateMaxResult(Map<String, String> configuration, SourceLocation sourceLocation)
@@ -92,4 +98,21 @@
         }
     }
 
+    private static void validateGzipCompressionLevel(Map<String, String> configuration, SourceLocation sourceLocation)
+            throws CompilationException {
+        String compressionLevelStr = configuration.get(ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL);
+        if (compressionLevelStr == null) {
+            return;
+        }
+        try {
+            int compressionLevel = Integer.parseInt(compressionLevelStr);
+            if (compressionLevel < 1 || compressionLevel > 9) {
+                throw new CompilationException(INVALID_REQ_PARAM_VAL, sourceLocation,
+                        ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL, compressionLevelStr);
+            }
+        } catch (NumberFormatException e) {
+            throw CompilationException.create(ErrorCode.INTEGER_VALUE_EXPECTED, sourceLocation, compressionLevelStr);
+        }
+    }
+
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
index 5ef196d..a587487 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
@@ -22,19 +22,30 @@
 import java.io.OutputStream;
 
 import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipParameters;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class GzipExternalFileCompressStreamFactory implements IExternalFileCompressStreamFactory {
     private static final long serialVersionUID = -7364595253362922025L;
-    public static IExternalFileCompressStreamFactory INSTANCE = new GzipExternalFileCompressStreamFactory();
+    private final int compressionLevel;
+    private final int bufferSize;
 
-    private GzipExternalFileCompressStreamFactory() {
+    public static GzipExternalFileCompressStreamFactory create(int compressionLevel, int bufferSize) {
+        return new GzipExternalFileCompressStreamFactory(compressionLevel, bufferSize);
+    }
+
+    private GzipExternalFileCompressStreamFactory(int compressionLevel, int bufferSize) {
+        this.compressionLevel = compressionLevel;
+        this.bufferSize = bufferSize;
     }
 
     @Override
     public OutputStream createStream(OutputStream outputStream) throws HyracksDataException {
         try {
-            return new GzipCompressorOutputStream(outputStream);
+            GzipParameters gzipParam = new GzipParameters();
+            gzipParam.setCompressionLevel(compressionLevel);
+            gzipParam.setBufferSize(bufferSize);
+            return new GzipCompressorOutputStream(outputStream, gzipParam);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 416e32f..e861a3f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -761,7 +761,7 @@
         fileWriterFactory.validate();
         String fileExtension = ExternalWriterProvider.getFileExtension(sink);
         int maxResult = ExternalWriterProvider.getMaxResult(sink);
-        IExternalPrinterFactory printerFactory = ExternalWriterProvider.createPrinter(sink, sourceType);
+        IExternalPrinterFactory printerFactory = ExternalWriterProvider.createPrinter(appCtx, sink, sourceType);
         ExternalFileWriterFactory writerFactory = new ExternalFileWriterFactory(fileWriterFactory, printerFactory,
                 fileExtension, maxResult, dynamicPathEvalFactory, staticPath, pathSourceLocation);
         SinkExternalWriterRuntimeFactory runtime = new SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 9253a48..cf51200 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -20,10 +20,12 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.zip.Deflater;
 
 import org.apache.asterix.cloud.writer.S3ExternalFileWriterFactory;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.writer.LocalFSExternalFileWriterFactory;
 import org.apache.asterix.external.writer.compressor.GzipExternalFileCompressStreamFactory;
 import org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
@@ -41,7 +43,6 @@
 
 public class ExternalWriterProvider {
     private static final Map<String, IExternalFileWriterFactoryProvider> CREATOR_MAP;
-    private static final Map<String, IExternalFileCompressStreamFactory> STREAM_COMPRESSORS;
 
     private ExternalWriterProvider() {
     }
@@ -50,10 +51,6 @@
         CREATOR_MAP = new HashMap<>();
         addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS, LocalFSExternalFileWriterFactory.PROVIDER);
         addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3, S3ExternalFileWriterFactory.PROVIDER);
-
-        STREAM_COMPRESSORS = new HashMap<>();
-        STREAM_COMPRESSORS.put(ExternalDataConstants.KEY_COMPRESSION_GZIP,
-                GzipExternalFileCompressStreamFactory.INSTANCE);
     }
 
     public static IExternalFileWriterFactory createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink,
@@ -105,7 +102,8 @@
         CREATOR_MAP.put(adapterName.toLowerCase(), creator);
     }
 
-    public static IExternalPrinterFactory createPrinter(IWriteDataSink sink, Object sourceType) {
+    public static IExternalPrinterFactory createPrinter(ICcApplicationContext appCtx, IWriteDataSink sink,
+            Object sourceType) {
         Map<String, String> configuration = sink.getConfiguration();
         String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
 
@@ -116,8 +114,7 @@
 
         String compression = getCompression(configuration);
         IExternalFileCompressStreamFactory compressStreamFactory =
-                STREAM_COMPRESSORS.getOrDefault(compression, NoOpExternalFileCompressStreamFactory.INSTANCE);
-
+                createCompressionStreamFactory(appCtx, compression, configuration);
         IPrinterFactory printerFactory = CleanJSONPrinterFactoryProvider.INSTANCE.getPrinterFactory(sourceType);
         return new TextualExternalFilePrinterFactory(printerFactory, compressStreamFactory);
     }
@@ -139,4 +136,23 @@
 
         return creator.getSeparator();
     }
+
+    private static IExternalFileCompressStreamFactory createCompressionStreamFactory(ICcApplicationContext appCtx,
+            String compression, Map<String, String> configuration) {
+        if (ExternalDataUtils.isGzipCompression(compression)) {
+            return createGzipStreamFactory(appCtx, configuration);
+        }
+        return NoOpExternalFileCompressStreamFactory.INSTANCE;
+    }
+
+    private static GzipExternalFileCompressStreamFactory createGzipStreamFactory(ICcApplicationContext appCtx,
+            Map<String, String> configuration) {
+        int compressionLevel = Deflater.DEFAULT_COMPRESSION;
+        String gzipCompressionLevel = configuration.get(ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL);
+        if (gzipCompressionLevel != null) {
+            compressionLevel = Integer.parseInt(gzipCompressionLevel);
+        }
+        return GzipExternalFileCompressStreamFactory.create(compressionLevel,
+                appCtx.getCompilerProperties().getFrameSize());
+    }
 }