[NO ISSUE][EXT] Configurable Gzip compression level
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add an option "gzipCompressionLevel" to specify the
compression level (1-9) to be used when gzip is used
with COPY TO.
- Default gzip compression level to -1 (library default)
when not specified.
- Make the compression buffer size equal to a frame size.
Change-Id: I6e80691e232269620d76e6b6f414cff6856f3232
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18204
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp
index 6df40c3..1d7253b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp
@@ -36,7 +36,8 @@
"serviceEndpoint":"http://127.0.0.1:8001",
"container":"playground",
"format":"json",
- "compression":"gzip"
+ "compression":"gzip",
+ "gzipCompressionLevel": "1"
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index fc8f527..79252ad 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -304,6 +304,7 @@
* Compression constants
*/
public static final String KEY_COMPRESSION_GZIP = "gzip";
+ public static final String KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL = "gzipCompressionLevel";
/**
* Writer Constants
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index ae3b567..b02122d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -1085,4 +1085,8 @@
return ExternalDataConstants.KEY_PATH;
}
}
+
+ public static boolean isGzipCompression(String compression) {
+ return ExternalDataConstants.KEY_COMPRESSION_GZIP.equalsIgnoreCase(compression);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
index 00fe855..b348c1e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
@@ -18,6 +18,9 @@
*/
package org.apache.asterix.external.util;
+import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL;
+import static org.apache.asterix.external.util.ExternalDataConstants.KEY_FORMAT;
+
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -59,6 +62,9 @@
checkSupported(ExternalDataConstants.KEY_WRITER_COMPRESSION, compression,
ExternalDataConstants.WRITER_SUPPORTED_COMPRESSION, ErrorCode.UNKNOWN_COMPRESSION_SCHEME,
sourceLocation, true);
+ if (ExternalDataUtils.isGzipCompression(compression)) {
+ validateGzipCompressionLevel(configuration, sourceLocation);
+ }
}
private static void validateMaxResult(Map<String, String> configuration, SourceLocation sourceLocation)
@@ -92,4 +98,21 @@
}
}
+ private static void validateGzipCompressionLevel(Map<String, String> configuration, SourceLocation sourceLocation)
+ throws CompilationException {
+ String compressionLevelStr = configuration.get(ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL);
+ if (compressionLevelStr == null) {
+ return;
+ }
+ try {
+ int compressionLevel = Integer.parseInt(compressionLevelStr);
+ if (compressionLevel < 1 || compressionLevel > 9) {
+ throw new CompilationException(INVALID_REQ_PARAM_VAL, sourceLocation,
+ ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL, compressionLevelStr);
+ }
+ } catch (NumberFormatException e) {
+ throw CompilationException.create(ErrorCode.INTEGER_VALUE_EXPECTED, sourceLocation, compressionLevelStr);
+ }
+ }
+
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
index 5ef196d..a587487 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
@@ -22,19 +22,30 @@
import java.io.OutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipParameters;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class GzipExternalFileCompressStreamFactory implements IExternalFileCompressStreamFactory {
private static final long serialVersionUID = -7364595253362922025L;
- public static IExternalFileCompressStreamFactory INSTANCE = new GzipExternalFileCompressStreamFactory();
+ private final int compressionLevel;
+ private final int bufferSize;
- private GzipExternalFileCompressStreamFactory() {
+ public static GzipExternalFileCompressStreamFactory create(int compressionLevel, int bufferSize) {
+ return new GzipExternalFileCompressStreamFactory(compressionLevel, bufferSize);
+ }
+
+ private GzipExternalFileCompressStreamFactory(int compressionLevel, int bufferSize) {
+ this.compressionLevel = compressionLevel;
+ this.bufferSize = bufferSize;
}
@Override
public OutputStream createStream(OutputStream outputStream) throws HyracksDataException {
try {
- return new GzipCompressorOutputStream(outputStream);
+ GzipParameters gzipParam = new GzipParameters();
+ gzipParam.setCompressionLevel(compressionLevel);
+ gzipParam.setBufferSize(bufferSize);
+ return new GzipCompressorOutputStream(outputStream, gzipParam);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 416e32f..e861a3f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -761,7 +761,7 @@
fileWriterFactory.validate();
String fileExtension = ExternalWriterProvider.getFileExtension(sink);
int maxResult = ExternalWriterProvider.getMaxResult(sink);
- IExternalPrinterFactory printerFactory = ExternalWriterProvider.createPrinter(sink, sourceType);
+ IExternalPrinterFactory printerFactory = ExternalWriterProvider.createPrinter(appCtx, sink, sourceType);
ExternalFileWriterFactory writerFactory = new ExternalFileWriterFactory(fileWriterFactory, printerFactory,
fileExtension, maxResult, dynamicPathEvalFactory, staticPath, pathSourceLocation);
SinkExternalWriterRuntimeFactory runtime = new SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 9253a48..cf51200 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -20,10 +20,12 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.zip.Deflater;
import org.apache.asterix.cloud.writer.S3ExternalFileWriterFactory;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.writer.LocalFSExternalFileWriterFactory;
import org.apache.asterix.external.writer.compressor.GzipExternalFileCompressStreamFactory;
import org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
@@ -41,7 +43,6 @@
public class ExternalWriterProvider {
private static final Map<String, IExternalFileWriterFactoryProvider> CREATOR_MAP;
- private static final Map<String, IExternalFileCompressStreamFactory> STREAM_COMPRESSORS;
private ExternalWriterProvider() {
}
@@ -50,10 +51,6 @@
CREATOR_MAP = new HashMap<>();
addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS, LocalFSExternalFileWriterFactory.PROVIDER);
addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3, S3ExternalFileWriterFactory.PROVIDER);
-
- STREAM_COMPRESSORS = new HashMap<>();
- STREAM_COMPRESSORS.put(ExternalDataConstants.KEY_COMPRESSION_GZIP,
- GzipExternalFileCompressStreamFactory.INSTANCE);
}
public static IExternalFileWriterFactory createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink,
@@ -105,7 +102,8 @@
CREATOR_MAP.put(adapterName.toLowerCase(), creator);
}
- public static IExternalPrinterFactory createPrinter(IWriteDataSink sink, Object sourceType) {
+ public static IExternalPrinterFactory createPrinter(ICcApplicationContext appCtx, IWriteDataSink sink,
+ Object sourceType) {
Map<String, String> configuration = sink.getConfiguration();
String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
@@ -116,8 +114,7 @@
String compression = getCompression(configuration);
IExternalFileCompressStreamFactory compressStreamFactory =
- STREAM_COMPRESSORS.getOrDefault(compression, NoOpExternalFileCompressStreamFactory.INSTANCE);
-
+ createCompressionStreamFactory(appCtx, compression, configuration);
IPrinterFactory printerFactory = CleanJSONPrinterFactoryProvider.INSTANCE.getPrinterFactory(sourceType);
return new TextualExternalFilePrinterFactory(printerFactory, compressStreamFactory);
}
@@ -139,4 +136,23 @@
return creator.getSeparator();
}
+
+ private static IExternalFileCompressStreamFactory createCompressionStreamFactory(ICcApplicationContext appCtx,
+ String compression, Map<String, String> configuration) {
+ if (ExternalDataUtils.isGzipCompression(compression)) {
+ return createGzipStreamFactory(appCtx, configuration);
+ }
+ return NoOpExternalFileCompressStreamFactory.INSTANCE;
+ }
+
+ private static GzipExternalFileCompressStreamFactory createGzipStreamFactory(ICcApplicationContext appCtx,
+ Map<String, String> configuration) {
+ int compressionLevel = Deflater.DEFAULT_COMPRESSION;
+ String gzipCompressionLevel = configuration.get(ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL);
+ if (gzipCompressionLevel != null) {
+ compressionLevel = Integer.parseInt(gzipCompressionLevel);
+ }
+ return GzipExternalFileCompressStreamFactory.create(compressionLevel,
+ appCtx.getCompilerProperties().getFrameSize());
+ }
}