[NO ISSUE][EXT] Avoid duplicate open for streams + minor refactoring
Change-Id: I405e84a30ee67b176c3389db6fd026c408ae1685
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10783
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
index ca55b6f..0b215a0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -30,6 +30,7 @@
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.api.AsterixInputStream;
@@ -69,9 +70,17 @@
return partitionConstraint;
}
+ protected int getPartitionsCount() {
+ return getPartitionConstraint().getLocations().length;
+ }
+
@Override
- public abstract void configure(IServiceContext ctx, Map<String, String> configuration,
- IWarningCollector warningCollector) throws AlgebricksException;
+ public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
+ throws AlgebricksException {
+ this.configuration = configuration;
+ this.partitionConstraint =
+ ((ICcApplicationContext) ctx.getApplicationContext()).getClusterStateManager().getClusterLocations();
+ }
/**
* Finds the smallest workload and returns it
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index e3e53d5..48035f3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -42,15 +42,17 @@
public class AwsS3InputStream extends AbstractExternalInputStream {
private final S3Client s3Client;
+ private final String bucket;
public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
super(configuration, filePaths);
this.s3Client = buildAwsS3Client(configuration);
+ this.bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
}
@Override
protected boolean getInputStream() throws IOException {
- String bucket = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
+ String fileName = filePaths.get(nextFileIndex);
GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder();
GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(filePaths.get(nextFileIndex)).build();
@@ -67,11 +69,10 @@
}
// Use gzip stream if needed
- String filename = filePaths.get(nextFileIndex).toLowerCase();
- if (filename.endsWith(".gz") || filename.endsWith(".gzip")) {
- in = new GZIPInputStream(s3Client.getObject(getObjectRequest), ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+ String lowerCaseFileName = fileName.toLowerCase();
+ if (lowerCaseFileName.endsWith(".gz") || lowerCaseFileName.endsWith(".gzip")) {
+ in = new GZIPInputStream(in, ExternalDataConstants.DEFAULT_BUFFER_SIZE);
}
-
return true;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 5bab888..a1c577a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -18,19 +18,17 @@
*/
package org.apache.asterix.external.input.record.reader.aws;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiPredicate;
import java.util.regex.Matcher;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
@@ -61,8 +59,7 @@
@Override
public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
throws AlgebricksException {
- this.configuration = configuration;
- ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext();
+ super.configure(ctx, configuration, warningCollector);
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
@@ -70,7 +67,7 @@
// Prepare to retrieve the objects
List<S3Object> filesOnly;
- String container = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
S3Client s3Client = ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
try {
@@ -101,12 +98,8 @@
warningCollector.warn(warning);
}
- // Partition constraints
- partitionConstraint = ccApplicationContext.getClusterStateManager().getClusterLocations();
- int partitionsCount = partitionConstraint.getLocations().length;
-
// Distribute work load amongst the partitions
- distributeWorkLoad(filesOnly, partitionsCount);
+ distributeWorkLoad(filesOnly, getPartitionsCount());
}
/**
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
index 358c412..3fb3395 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.external.input.record.reader.azure;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob;
-
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -43,15 +41,17 @@
public class AzureBlobInputStream extends AbstractExternalInputStream {
private final BlobServiceClient client;
+ private final String container;
public AzureBlobInputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
super(configuration, filePaths);
this.client = buildAzureClient(configuration);
+ this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
}
@Override
protected boolean getInputStream() throws IOException {
- String container = configuration.get(AzureBlob.CONTAINER_NAME_FIELD_NAME);
+ String fileName = filePaths.get(nextFileIndex);
BlobContainerClient blobContainerClient;
BlobClient blob;
try {
@@ -60,9 +60,9 @@
in = blob.openInputStream();
// Use gzip stream if needed
- String filename = filePaths.get(nextFileIndex).toLowerCase();
- if (filename.endsWith(".gz") || filename.endsWith(".gzip")) {
- in = new GZIPInputStream(in = blob.openInputStream(), ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+ String lowerCaseFileName = fileName.toLowerCase();
+ if (lowerCaseFileName.endsWith(".gz") || lowerCaseFileName.endsWith(".gzip")) {
+ in = new GZIPInputStream(in, ExternalDataConstants.DEFAULT_BUFFER_SIZE);
}
} catch (BlobStorageException ex) {
if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
index 167e22a..ca064b1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
@@ -18,19 +18,17 @@
*/
package org.apache.asterix.external.input.record.reader.azure;
-import static org.apache.asterix.external.util.ExternalDataConstants.*;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiPredicate;
import java.util.regex.Matcher;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
@@ -56,10 +54,9 @@
@Override
public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
throws AlgebricksException {
- this.configuration = configuration;
- ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext();
+ super.configure(ctx, configuration, warningCollector);
- String container = configuration.get(AzureBlob.CONTAINER_NAME_FIELD_NAME);
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
List<BlobItem> filesOnly = new ArrayList<>();
@@ -87,12 +84,8 @@
warningCollector.warn(warning);
}
- // Partition constraints
- partitionConstraint = ccApplicationContext.getClusterStateManager().getClusterLocations();
- int partitionsCount = partitionConstraint.getLocations().length;
-
// Distribute work load amongst the partitions
- distributeWorkLoad(filesOnly, partitionsCount);
+ distributeWorkLoad(filesOnly, getPartitionsCount());
} catch (Exception ex) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index fd5b269..45d15df 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -288,6 +288,7 @@
public static final String INVALID_VAL = "invalid value";
public static final String DEFINITION_FIELD_NAME = "definition";
+ public static final String CONTAINER_NAME_FIELD_NAME = "container";
public static class AwsS3 {
private AwsS3() {
@@ -298,7 +299,6 @@
public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId";
public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey";
public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken";
- public static final String CONTAINER_NAME_FIELD_NAME = "container";
public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
}
@@ -307,8 +307,6 @@
throw new AssertionError("do not instantiate");
}
- public static final String CONTAINER_NAME_FIELD_NAME = "container";
- public static final String DEFINITION_FIELD_NAME = "definition";
public static final String CONNECTION_STRING_FIELD_NAME = "connectionString";
public static final String ACCOUNT_NAME_FIELD_NAME = "accountName";
public static final String ACCOUNT_KEY_FIELD_NAME = "accountKey";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 3ac1116..d1bbe89 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -776,7 +776,7 @@
S3Client s3Client = buildAwsS3Client(configuration);;
S3Response response;
boolean useOldApi = false;
- String container = configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME);
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
String prefix = getPrefix(configuration);
try {
@@ -943,7 +943,7 @@
// Check if the bucket is present
BlobServiceClient blobServiceClient;
try {
- String container = configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME);
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
blobServiceClient = buildAzureClient(configuration);
BlobContainerClient blobContainer = blobServiceClient.getBlobContainerClient(container);