[ASTERIXDB-3268][STO] Add parallel downloader/cacher
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Download uncached files in parallel. Also, when listing
files, we will ignore consulting with the cloud store
for downloaded indexes.
Change-Id: I5f48da0f506200f35fd9b08e51983ef7ad919f6d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17791
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-cloud/pom.xml b/asterixdb/asterix-cloud/pom.xml
index 542e6ae..9e840c2 100644
--- a/asterixdb/asterix-cloud/pom.xml
+++ b/asterixdb/asterix-cloud/pom.xml
@@ -94,7 +94,6 @@
<dependency>
<groupId>software.amazon.awssdk.crt</groupId>
<artifactId>aws-crt</artifactId>
- <version>0.21.10</version>
</dependency>
<dependency>
<groupId>junit</groupId>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index d9c248e..7243188 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -139,6 +139,7 @@
}
}
} else {
+ LOGGER.info("Cleaning node partitions...");
for (FileReference partitionPath : partitionPaths) {
CloudFileUtil.cleanDirectoryFiles(localIoManager, cloudFiles, partitionPath);
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index f869f37..c095d87 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -22,11 +22,10 @@
import java.io.File;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.asterix.cloud.clients.IParallelDownloader;
import org.apache.asterix.common.config.CloudProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
@@ -57,14 +56,10 @@
@Override
protected void downloadPartitions() throws HyracksDataException {
- // TODO currently it throws an error in local test
- Map<String, String> cloudToLocalStoragePaths = new HashMap<>();
- for (FileReference partitionPath : partitionPaths) {
- String cloudStoragePath = STORAGE_ROOT_DIR_NAME + "/" + partitionPath.getName();
- cloudToLocalStoragePaths.put(cloudStoragePath, partitionPath.getAbsolutePath());
- }
- LOGGER.info("Resolved paths to io devices: {}", cloudToLocalStoragePaths);
- cloudClient.syncFiles(bucket, cloudToLocalStoragePaths);
+ IParallelDownloader downloader = cloudClient.createParallelDownloader(bucket, localIoManager);
+ LOGGER.info("Downloading all files located in {}", partitionPaths);
+ downloader.downloadDirectories(partitionPaths);
+ LOGGER.info("Finished downloading {}", partitionPaths);
}
@Override
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 01f684b..6e00817 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -21,11 +21,15 @@
import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
import java.io.FilenameFilter;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.lazy.ParallelCacher;
import org.apache.asterix.cloud.lazy.accessor.ILazyAccessor;
import org.apache.asterix.cloud.lazy.accessor.ILazyAccessorReplacer;
import org.apache.asterix.cloud.lazy.accessor.InitialCloudAccessor;
@@ -54,7 +58,7 @@
public LazyCloudIOManager(IOManager ioManager, CloudProperties cloudProperties) throws HyracksDataException {
super(ioManager, cloudProperties);
- accessor = new InitialCloudAccessor(cloudClient, bucket, localIoManager, writeBufferProvider);
+ accessor = new InitialCloudAccessor(cloudClient, bucket, localIoManager);
replacer = () -> {
synchronized (this) {
if (!accessor.isLocalAccessor()) {
@@ -92,9 +96,11 @@
cloudFiles.removeAll(localFiles);
int remainingUncachedFiles = cloudFiles.size();
if (remainingUncachedFiles > 0) {
+ List<FileReference> uncachedFiles = resolve(cloudFiles);
+ IParallelDownloader downloader = cloudClient.createParallelDownloader(bucket, localIoManager);
+ ParallelCacher cacher = new ParallelCacher(downloader, uncachedFiles);
// Local cache misses some files, cloud-based accessor is needed for read operations
- accessor = new ReplaceableCloudAccessor(cloudClient, bucket, localIoManager, partitions,
- remainingUncachedFiles, writeBufferProvider, replacer);
+ accessor = new ReplaceableCloudAccessor(cloudClient, bucket, localIoManager, partitions, replacer, cacher);
} else {
// Everything is cached, no need to invoke cloud-based accessor for read operations
accessor = new LocalAccessor(cloudClient, bucket, localIoManager);
@@ -152,6 +158,14 @@
log("WRITE", fileRef);
}
+ private List<FileReference> resolve(Set<String> cloudFiles) throws HyracksDataException {
+ List<FileReference> fileReferences = new ArrayList<>();
+ for (String file : cloudFiles) {
+ fileReferences.add(resolve(file));
+ }
+ return fileReferences;
+ }
+
private void log(String op, FileReference fileReference) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} {}", op, fileReference.getRelativePath());
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
index 3135624..1c7713a 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
@@ -50,13 +50,17 @@
* Actually, is there a case where we delete multiple directories from the cloud?
*/
List<String> paths = fileReferences.stream().map(FileReference::getRelativePath).collect(Collectors.toList());
+ if (paths.isEmpty()) {
+ return 0;
+ }
+
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Bulk deleting: local: {}, cloud: {}", fileReferences, paths);
}
cloudClient.deleteObjects(bucket, paths);
// Bulk delete locally as well
- int localDeletes = super.performOperation();
- callBack.call(localDeletes, paths);
- return paths.size();
+ super.performOperation();
+ callBack.call(fileReferences);
+ return fileReferences.size();
}
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
index 14a0c4e..f0b336d 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/IBulkOperationCallBack.java
@@ -18,8 +18,10 @@
*/
package org.apache.asterix.cloud.bulk;
-import java.util.Collection;
+import java.util.List;
+
+import org.apache.hyracks.api.io.FileReference;
public interface IBulkOperationCallBack {
- void call(int numberOfAffectedLocalFiles, Collection<String> paths);
+ void call(List<FileReference> fileReferences);
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java
index c877be2..6e89cb0 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/NoOpDeleteBulkCallBack.java
@@ -18,7 +18,9 @@
*/
package org.apache.asterix.cloud.bulk;
-import java.util.Collection;
+import java.util.List;
+
+import org.apache.hyracks.api.io.FileReference;
public class NoOpDeleteBulkCallBack implements IBulkOperationCallBack {
public static final IBulkOperationCallBack INSTANCE = new NoOpDeleteBulkCallBack();
@@ -27,7 +29,7 @@
}
@Override
- public void call(int numberOfAffectedLocalFiles, Collection<String> paths) {
+ public void call(List<FileReference> fileReferences) {
// NoOp
}
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index f2b7ff4..76a768a 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -22,11 +22,11 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.Map;
import java.util.Set;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -131,13 +131,13 @@
boolean exists(String bucket, String path) throws HyracksDataException;
/**
- * Syncs files by downloading them from cloud storage to local storage
+ * Create a parallel downloader
*
- * @param bucket bucket to sync from
- * @param cloudToLocalStoragePaths map of cloud storage partition to local storage path
- * @throws HyracksDataException HyracksDataException
+ * @param bucket bucket
+ * @param ioManager local {@link IOManager}
+ * @return an instance of a new parallel downloader
*/
- void syncFiles(String bucket, Map<String, String> cloudToLocalStoragePaths) throws HyracksDataException;
+ IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager);
/**
* Produces a {@link JsonNode} that contains information about the stored objects in the cloud
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
new file mode 100644
index 0000000..32d0a74
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients;
+
+import java.util.Collection;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+
+public interface IParallelDownloader {
+
+ /**
+ * Downloads files in all partitions
+ *
+ * @param toDownload all files to be downloaded
+ */
+ void downloadFiles(Collection<FileReference> toDownload) throws HyracksDataException;
+
+ /**
+ * Downloads files in all partitions
+ *
+ * @param toDownload all files to be downloaded
+ * @return file that failed to download
+ */
+ Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload) throws HyracksDataException;
+
+ /**
+ * Close the downloader and release all of its resources
+ */
+ void close();
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index fe1f4af..56ed3cf 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -25,7 +25,8 @@
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
public class S3ClientConfig {
-
+ // The maximum number of file that can be deleted (AWS restriction)
+ static final int DELETE_BATCH_SIZE = 1000;
private final String region;
private final String endpoint;
private final String prefix;
@@ -59,7 +60,7 @@
return prefix;
}
- public boolean isEncodeKeys() {
+ public boolean isLocalS3Provider() {
// to workaround https://github.com/findify/s3mock/issues/187 in our S3Mock, we encode/decode keys
return isS3Mock();
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 5faf663..02e1d4f 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -18,40 +18,33 @@
*/
package org.apache.asterix.cloud.clients.aws.s3;
+import static org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig.DELETE_BATCH_SIZE;
import static org.apache.asterix.cloud.clients.aws.s3.S3Utils.encodeURI;
import static org.apache.asterix.cloud.clients.aws.s3.S3Utils.listS3Objects;
-import java.io.File;
-import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
-import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.IParallelDownloader;
import org.apache.asterix.cloud.clients.profiler.CountRequestProfiler;
import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
import org.apache.asterix.cloud.clients.profiler.NoOpRequestProfiler;
-import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.util.IoUtil;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.hyracks.control.nc.io.IOManager;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -61,10 +54,8 @@
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
-import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
@@ -75,23 +66,11 @@
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Object;
-import software.amazon.awssdk.transfer.s3.S3TransferManager;
-import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryDownload;
-import software.amazon.awssdk.transfer.s3.model.DirectoryDownload;
-import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest;
public class S3CloudClient implements ICloudClient {
-
- private static final Logger LOGGER = LogManager.getLogger();
- // TODO(htowaileb): Temporary variables, can we get this from the used instance?
- private static final double MAX_HOST_BANDWIDTH = 10.0; // in Gbps
- // The maximum number of file that can be deleted (AWS restriction)
- private static final int DELETE_BATCH_SIZE = 1000;
-
private final S3ClientConfig config;
private final S3Client s3Client;
private final IRequestProfiler profiler;
- private S3TransferManager s3TransferManager;
public S3CloudClient(S3ClientConfig config) {
this.config = config;
@@ -113,7 +92,7 @@
@Override
public Set<String> listObjects(String bucket, String path, FilenameFilter filter) {
profiler.objectsList();
- path = config.isEncodeKeys() ? encodeURI(path) : path;
+ path = config.isLocalS3Provider() ? encodeURI(path) : path;
return filterAndGet(listS3Objects(s3Client, bucket, path), filter);
}
@@ -181,7 +160,7 @@
@Override
public void copy(String bucket, String srcPath, FileReference destPath) {
- srcPath = config.isEncodeKeys() ? encodeURI(srcPath) : srcPath;
+ srcPath = config.isLocalS3Provider() ? encodeURI(srcPath) : srcPath;
List<S3Object> objects = listS3Objects(s3Client, bucket, srcPath);
profiler.objectsList();
@@ -243,50 +222,8 @@
}
@Override
- public void syncFiles(String bucket, Map<String, String> cloudToLocalStoragePaths) throws HyracksDataException {
- LOGGER.info("Syncing cloud storage to local storage started");
-
- S3TransferManager s3TransferManager = getS3TransferManager();
-
- List<CompletableFuture<CompletedDirectoryDownload>> downloads = new ArrayList<>();
- cloudToLocalStoragePaths.forEach((cloudStoragePath, localStoragePath) -> {
- DownloadDirectoryRequest.Builder builder = DownloadDirectoryRequest.builder();
- builder.bucket(bucket);
- builder.destination(Paths.get(localStoragePath));
- builder.listObjectsV2RequestTransformer(l -> l.prefix(cloudStoragePath));
-
- LOGGER.info("TransferManager started downloading from cloud \"{}\" to local storage \"{}\"",
- cloudStoragePath, localStoragePath);
- DirectoryDownload directoryDownload = s3TransferManager.downloadDirectory(builder.build());
- downloads.add(directoryDownload.completionFuture());
- });
-
- try {
- for (CompletableFuture<CompletedDirectoryDownload> download : downloads) {
- // multipart download
- profiler.objectMultipartDownload();
- download.join();
- CompletedDirectoryDownload completedDirectoryDownload = download.get();
-
- // if we have failed downloads with transfer manager, try to download them with GetObject
- if (!completedDirectoryDownload.failedTransfers().isEmpty()) {
- LOGGER.warn("TransferManager failed to download file(s), will retry to download each separately");
- completedDirectoryDownload.failedTransfers().forEach(LOGGER::warn);
-
- Map<String, String> failedFiles = new HashMap<>();
- completedDirectoryDownload.failedTransfers().forEach(failed -> {
- String cloudStoragePath = failed.request().getObjectRequest().key();
- String localStoragePath = failed.request().destination().toAbsolutePath().toString();
- failedFiles.put(cloudStoragePath, localStoragePath);
- });
- downloadFiles(bucket, failedFiles);
- }
- LOGGER.info("TransferManager finished downloading {} to local storage", completedDirectoryDownload);
- }
- } catch (ExecutionException | InterruptedException e) {
- throw HyracksDataException.create(e);
- }
- LOGGER.info("Syncing cloud storage to local storage successful");
+ public IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager) {
+ return new S3ParallelDownloader(bucket, ioManager, config, profiler);
}
@Override
@@ -305,13 +242,7 @@
@Override
public void close() {
- if (s3Client != null) {
- s3Client.close();
- }
-
- if (s3TransferManager != null) {
- s3TransferManager.close();
- }
+ s3Client.close();
}
private S3Client buildClient() {
@@ -333,65 +264,11 @@
private Set<String> filterAndGet(List<S3Object> contents, FilenameFilter filter) {
Set<String> files = new HashSet<>();
for (S3Object s3Object : contents) {
- String path = config.isEncodeKeys() ? S3Utils.decodeURI(s3Object.key()) : s3Object.key();
+ String path = config.isLocalS3Provider() ? S3Utils.decodeURI(s3Object.key()) : s3Object.key();
if (filter.accept(null, IoUtil.getFileNameFromPath(path))) {
files.add(path);
}
}
return files;
}
-
- private void downloadFiles(String bucket, Map<String, String> cloudToLocalStoragePaths)
- throws HyracksDataException {
- byte[] buffer = new byte[8 * 1024];
- for (Map.Entry<String, String> entry : cloudToLocalStoragePaths.entrySet()) {
- String cloudStoragePath = entry.getKey();
- String localStoragePath = entry.getValue();
-
- LOGGER.info("GetObject started downloading from cloud \"{}\" to local storage \"{}\"", cloudStoragePath,
- localStoragePath);
-
- // TODO(htowaileb): add retry logic here
- try {
- File localFile = new File(localStoragePath);
- FileUtils.createParentDirectories(localFile);
- if (!localFile.createNewFile()) {
- // do nothing for now, a restart has the files when trying to flush, for testing
- //throw new IllegalStateException("Couldn't create local file");
- }
-
- try (InputStream inputStream = getObjectStream(bucket, cloudStoragePath);
- FileOutputStream outputStream = new FileOutputStream(localFile)) {
- int bytesRead;
- while ((bytesRead = inputStream.read(buffer)) != -1) {
- outputStream.write(buffer, 0, bytesRead);
- }
- }
- } catch (IOException ex) {
- throw HyracksDataException.create(ex);
- }
- LOGGER.info("GetObject successful downloading from cloud \"{}\" to local storage \"{}\"", cloudStoragePath,
- localStoragePath);
- }
- }
-
- private S3TransferManager getS3TransferManager() {
- if (s3TransferManager != null) {
- return s3TransferManager;
- }
-
- S3CrtAsyncClientBuilder builder = S3AsyncClient.crtBuilder();
- builder.credentialsProvider(config.createCredentialsProvider());
- builder.region(Region.of(config.getRegion()));
- builder.targetThroughputInGbps(MAX_HOST_BANDWIDTH);
- builder.minimumPartSizeInBytes((long) 8 * 1024 * 1024);
-
- if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
- builder.endpointOverride(URI.create(config.getEndpoint()));
- }
-
- S3AsyncClient client = builder.build();
- s3TransferManager = S3TransferManager.builder().s3Client(client).build();
- return s3TransferManager;
- }
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
new file mode 100644
index 0000000..c5d9b73
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.aws.s3;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
+import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.transfer.s3.S3TransferManager;
+import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryDownload;
+import software.amazon.awssdk.transfer.s3.model.CompletedFileDownload;
+import software.amazon.awssdk.transfer.s3.model.DirectoryDownload;
+import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest;
+import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest;
+import software.amazon.awssdk.transfer.s3.model.FailedFileDownload;
+import software.amazon.awssdk.transfer.s3.model.FileDownload;
+
+@ThreadSafe
+class S3ParallelDownloader implements IParallelDownloader {
+ private final String bucket;
+ private final IOManager ioManager;
+ private final S3AsyncClient s3AsyncClient;
+ private final S3TransferManager transferManager;
+ private final IRequestProfiler profiler;
+
+ S3ParallelDownloader(String bucket, IOManager ioManager, S3ClientConfig config, IRequestProfiler profiler) {
+ this.bucket = bucket;
+ this.ioManager = ioManager;
+ this.profiler = profiler;
+ s3AsyncClient = createAsyncClient(config);
+ transferManager = createS3TransferManager(s3AsyncClient);
+ }
+
+ @Override
+ public void downloadFiles(Collection<FileReference> toDownload) throws HyracksDataException {
+ try {
+ List<CompletableFuture<CompletedFileDownload>> downloads = startDownloadingFiles(toDownload);
+ waitForFileDownloads(downloads);
+ } catch (IOException | ExecutionException | InterruptedException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload)
+ throws HyracksDataException {
+ Set<FileReference> failedFiles;
+ List<CompletableFuture<CompletedDirectoryDownload>> downloads = startDownloadingDirectories(toDownload);
+ try {
+ failedFiles = waitForDirectoryDownloads(downloads);
+ } catch (ExecutionException | InterruptedException e) {
+ throw HyracksDataException.create(e);
+ }
+
+ return failedFiles;
+ }
+
+ @Override
+ public void close() {
+ transferManager.close();
+ s3AsyncClient.close();
+ }
+
+ private List<CompletableFuture<CompletedFileDownload>> startDownloadingFiles(Collection<FileReference> toDownload)
+ throws IOException {
+ List<CompletableFuture<CompletedFileDownload>> downloads = new ArrayList<>();
+ for (FileReference fileReference : toDownload) {
+ // multipart download
+ profiler.objectGet();
+
+ // Create parent directories
+ FileUtils.createParentDirectories(fileReference.getFile());
+
+ // GetObjectRequest
+ GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder();
+ requestBuilder.bucket(bucket);
+ requestBuilder.key(fileReference.getRelativePath());
+
+ // Download object
+ DownloadFileRequest.Builder builder = DownloadFileRequest.builder();
+ builder.getObjectRequest(requestBuilder.build());
+ builder.destination(fileReference.getFile());
+
+ FileDownload fileDownload = transferManager.downloadFile(builder.build());
+ downloads.add(fileDownload.completionFuture());
+ }
+ return downloads;
+ }
+
+ private void waitForFileDownloads(List<CompletableFuture<CompletedFileDownload>> downloads)
+ throws ExecutionException, InterruptedException {
+
+ for (CompletableFuture<CompletedFileDownload> download : downloads) {
+ download.get();
+ }
+ }
+
+ private List<CompletableFuture<CompletedDirectoryDownload>> startDownloadingDirectories(
+ Collection<FileReference> toDownload) {
+ List<CompletableFuture<CompletedDirectoryDownload>> downloads = new ArrayList<>();
+ for (FileReference fileReference : toDownload) {
+ DownloadDirectoryRequest.Builder builder = DownloadDirectoryRequest.builder();
+ builder.bucket(bucket);
+ builder.destination(fileReference.getFile().toPath());
+ builder.listObjectsV2RequestTransformer(l -> l.prefix(fileReference.getRelativePath()));
+ DirectoryDownload directoryDownload = transferManager.downloadDirectory(builder.build());
+ downloads.add(directoryDownload.completionFuture());
+ }
+ return downloads;
+ }
+
+ private Set<FileReference> waitForDirectoryDownloads(List<CompletableFuture<CompletedDirectoryDownload>> downloads)
+ throws ExecutionException, InterruptedException, HyracksDataException {
+ Set<FileReference> failedFiles = Collections.emptySet();
+ for (CompletableFuture<CompletedDirectoryDownload> download : downloads) {
+ // multipart download
+ profiler.objectMultipartDownload();
+ download.join();
+ CompletedDirectoryDownload completedDirectoryDownload = download.get();
+
+ // if we have failed downloads with transfer manager, try to download them with GetObject
+ if (!completedDirectoryDownload.failedTransfers().isEmpty()) {
+ failedFiles = failedFiles.isEmpty() ? new HashSet<>() : failedFiles;
+ for (FailedFileDownload failedFileDownload : completedDirectoryDownload.failedTransfers()) {
+ FileReference failedFile = ioManager.resolve(failedFileDownload.request().getObjectRequest().key());
+ failedFiles.add(failedFile);
+ }
+ }
+ }
+ return failedFiles;
+ }
+
+ private static S3AsyncClient createAsyncClient(S3ClientConfig config) {
+ if (config.isLocalS3Provider()) {
+ // CRT client is not supported by S3Mock
+ return createS3AsyncClient(config);
+ } else {
+ // CRT could provide a better performance when used with an actual S3
+ return createS3CrtAsyncClient(config);
+ }
+ }
+
+ private static S3AsyncClient createS3AsyncClient(S3ClientConfig config) {
+ S3AsyncClientBuilder builder = S3AsyncClient.builder();
+ builder.credentialsProvider(config.createCredentialsProvider());
+ builder.region(Region.of(config.getRegion()));
+
+ if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
+ builder.endpointOverride(URI.create(config.getEndpoint()));
+ }
+
+ return builder.build();
+ }
+
+ private static S3AsyncClient createS3CrtAsyncClient(S3ClientConfig config) {
+ S3CrtAsyncClientBuilder builder = S3AsyncClient.crtBuilder();
+ builder.credentialsProvider(config.createCredentialsProvider());
+ builder.region(Region.of(config.getRegion()));
+
+ if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
+ builder.endpointOverride(URI.create(config.getEndpoint()));
+ }
+
+ return builder.build();
+ }
+
+ private S3TransferManager createS3TransferManager(S3AsyncClient s3AsyncClient) {
+ return S3TransferManager.builder().s3Client(s3AsyncClient).build();
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
new file mode 100644
index 0000000..7e4bc4f
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy;
+
+import java.util.Collection;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+
+public interface IParallelCacher {
+ /**
+ * Check whether an index file data and metadata are already cached
+ *
+ * @param indexDir index directory
+ * @return true if the index is already cached, false otherwise
+ */
+ boolean isCached(FileReference indexDir);
+
+ /**
+ * Downloads all index's data files for all partitions.
+ * The index is inferred from the path of the provided file.
+ *
+ * @param indexFile a file reference in an index
+ * @return true if the remaining number of uncached files is zero, false otherwise
+ */
+ boolean downloadData(FileReference indexFile) throws HyracksDataException;
+
+ /**
+ * Downloads all index's metadata files for all partitions.
+ * The index is inferred from the path of the provided file.
+ *
+ * @param indexFile a file reference in an index
+ * @return true if the remaining number of uncached files is zero, false otherwise
+ */
+ boolean downloadMetadata(FileReference indexFile) throws HyracksDataException;
+
+ /**
+ * Remove the deleted files from the uncached file set
+ *
+ * @param deletedFiles all deleted files
+ * @return true if the remaining number of uncached files is zero, false otherwise
+ */
+ boolean remove(Collection<FileReference> deletedFiles);
+
+ /**
+ * Remove the deleted file from the uncached file set
+ *
+ * @param deletedFile the deleted file
+ * @return true if the remaining number of uncached files is zero, false otherwise
+ */
+ boolean remove(FileReference deletedFile);
+
+ /**
+ * Close cacher resources
+ */
+ void close();
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java
new file mode 100644
index 0000000..010433d
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy;
+
+import java.util.Collection;
+
+import org.apache.hyracks.api.io.FileReference;
+
+public class NoOpParallelCacher implements IParallelCacher {
+ public static final IParallelCacher INSTANCE = new NoOpParallelCacher();
+
+ @Override
+ public boolean isCached(FileReference indexDir) {
+ return false;
+ }
+
+ @Override
+ public boolean downloadData(FileReference indexFile) {
+ return false;
+ }
+
+ @Override
+ public boolean downloadMetadata(FileReference indexFile) {
+ return false;
+ }
+
+ @Override
+ public boolean remove(Collection<FileReference> deletedFiles) {
+ return false;
+ }
+
+ @Override
+ public boolean remove(FileReference deletedFile) {
+ return false;
+ }
+
+ @Override
+ public void close() {
+ // NoOp
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
new file mode 100644
index 0000000..2d55d06
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.lazy;
+
+import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER;
+
+import java.io.FilenameFilter;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * A parallel cacher that maintains and downloads (in parallel) all uncached files
+ *
+ * @see org.apache.asterix.cloud.lazy.accessor.ReplaceableCloudAccessor
+ */
+public final class ParallelCacher implements IParallelCacher {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final FilenameFilter METADATA_FILTER =
+ ((dir, name) -> name.startsWith(StorageConstants.INDEX_NON_DATA_FILES_PREFIX));
+
+ private final IParallelDownloader downloader;
+
+ /**
+ * Uncached Indexes subpaths
+ */
+ private final Set<String> uncachedIndexes;
+
+ /**
+ * All uncached data files
+ * Example: BTree files
+ */
+ private final Set<FileReference> uncachedDataFiles;
+
+ /**
+ * All uncached metadata files
+ * Example: index checkpoint files
+ */
+ private final Set<FileReference> uncachedMetadataFiles;
+
+ public ParallelCacher(IParallelDownloader downloader, List<FileReference> uncachedFiles) {
+ this.downloader = downloader;
+ uncachedDataFiles = getFiles(uncachedFiles, COMPONENT_FILES_FILTER);
+ uncachedMetadataFiles = getFiles(uncachedFiles, METADATA_FILTER);
+ uncachedIndexes = getUncachedIndexes(uncachedFiles);
+ }
+
+ @Override
+ public boolean isCached(FileReference indexDir) {
+ String relativePath = indexDir.getRelativePath();
+ if (relativePath.endsWith(StorageConstants.STORAGE_ROOT_DIR_NAME)
+ || relativePath.startsWith(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME)) {
+ return false;
+ }
+ String indexSubPath = StoragePathUtil.getIndexSubPath(indexDir, true);
+ return !indexSubPath.isEmpty() && !uncachedIndexes.contains(indexSubPath);
+ }
+
+ @Override
+ public synchronized boolean downloadData(FileReference indexFile) throws HyracksDataException {
+ String indexSubPath = StoragePathUtil.getIndexSubPath(indexFile, false);
+ Set<FileReference> toDownload = new HashSet<>();
+ for (FileReference fileReference : uncachedDataFiles) {
+ if (fileReference.getRelativePath().contains(indexSubPath)) {
+ toDownload.add(fileReference.getParent());
+ }
+ }
+
+ LOGGER.info("Downloading data files for {} in all partitions: {}", indexSubPath, toDownload);
+ Collection<FileReference> failed = downloader.downloadDirectories(toDownload);
+ if (!failed.isEmpty()) {
+ LOGGER.warn("Failed to download data files {}. Re-downloading: {}", indexSubPath, failed);
+ downloader.downloadFiles(failed);
+ }
+ LOGGER.info("Finished downloading data files for {}", indexSubPath);
+ uncachedIndexes.remove(indexSubPath);
+ uncachedDataFiles.removeIf(f -> f.getRelativePath().contains(indexSubPath));
+ return isEmpty();
+ }
+
+ @Override
+ public synchronized boolean downloadMetadata(FileReference indexFile) throws HyracksDataException {
+ String indexSubPath = StoragePathUtil.getIndexSubPath(indexFile, false);
+ Set<FileReference> toDownload = new HashSet<>();
+ for (FileReference fileReference : uncachedMetadataFiles) {
+ if (fileReference.getRelativePath().contains(indexSubPath)) {
+ toDownload.add(fileReference);
+ }
+ }
+
+ LOGGER.info("Downloading metadata files for {} in all partitions: {}", indexSubPath, toDownload);
+ downloader.downloadFiles(toDownload);
+ LOGGER.info("Finished downloading metadata files for {}", indexSubPath);
+ uncachedMetadataFiles.removeAll(toDownload);
+ return isEmpty();
+ }
+
+ @Override
+ public boolean remove(Collection<FileReference> deletedFiles) {
+ LOGGER.info("Deleting {}", deletedFiles);
+ for (FileReference fileReference : deletedFiles) {
+ remove(fileReference);
+ }
+
+ return isEmpty();
+ }
+
+ @Override
+ public boolean remove(FileReference deletedFile) {
+ LOGGER.info("Deleting {}", deletedFile);
+ if (COMPONENT_FILES_FILTER.accept(null, deletedFile.getName())) {
+ uncachedDataFiles.remove(deletedFile);
+ } else {
+ uncachedMetadataFiles.remove(deletedFile);
+ }
+
+ return isEmpty();
+ }
+
+ @Override
+ public void close() {
+ downloader.close();
+ LOGGER.info("Parallel cacher was closed");
+ }
+
+ private Set<FileReference> getFiles(List<FileReference> uncachedFiles, FilenameFilter filter) {
+ Set<FileReference> fileReferences = ConcurrentHashMap.newKeySet();
+ for (FileReference fileReference : uncachedFiles) {
+ if (filter.accept(null, fileReference.getName())) {
+ fileReferences.add(fileReference);
+ }
+ }
+ return fileReferences;
+ }
+
+ private Set<String> getUncachedIndexes(List<FileReference> uncachedFiles) {
+ Set<String> uncachedIndexes = ConcurrentHashMap.newKeySet();
+ for (FileReference indexFile : uncachedFiles) {
+ uncachedIndexes.add(StoragePathUtil.getIndexSubPath(indexFile, false));
+ }
+ return uncachedIndexes;
+ }
+
+ private synchronized boolean isEmpty() {
+ int totalSize = uncachedDataFiles.size() + uncachedMetadataFiles.size();
+ LOGGER.info("Current number of uncached files {}", totalSize);
+ return totalSize == 0;
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
index de7efc1..c7ce222 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/AbstractLazyAccessor.java
@@ -42,18 +42,23 @@
this.localIoManager = localIoManager;
}
- int doCloudDelete(FileReference fileReference) throws HyracksDataException {
- int numberOfCloudDeletes = 0;
+ Set<FileReference> doCloudDelete(FileReference fileReference) throws HyracksDataException {
+ Set<FileReference> deletedFiles = Collections.emptySet();
if (!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileReference.getAbsolutePath()))) {
File localFile = fileReference.getFile();
- // if file reference exists,and it is a file, then list is not required
- Set<String> paths =
- localFile.exists() && localFile.isFile() ? Collections.singleton(fileReference.getRelativePath())
- : doList(fileReference, IoUtil.NO_OP_FILTER).stream().map(FileReference::getRelativePath)
- .collect(Collectors.toSet());
+
+ Set<String> paths;
+ if (localFile.exists() && localFile.isFile()) {
+ // If file reference exists, and it is a file, then list is not required
+ paths = Collections.singleton(fileReference.getRelativePath());
+ } else {
+ // List and delete
+ deletedFiles = doList(fileReference, IoUtil.NO_OP_FILTER);
+ paths = deletedFiles.stream().map(FileReference::getRelativePath).collect(Collectors.toSet());
+ }
+
cloudClient.deleteObjects(bucket, paths);
- numberOfCloudDeletes = paths.size();
}
- return numberOfCloudDeletes;
+ return deletedFiles;
}
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
index b93da54..798163d 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/InitialCloudAccessor.java
@@ -20,8 +20,8 @@
import java.util.Collections;
-import org.apache.asterix.cloud.WriteBufferProvider;
import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.lazy.NoOpParallelCacher;
import org.apache.hyracks.control.nc.io.IOManager;
/**
@@ -32,18 +32,7 @@
private static final ILazyAccessorReplacer NO_OP_REPLACER = () -> {
};
- public InitialCloudAccessor(ICloudClient cloudClient, String bucket, IOManager localIoManager,
- WriteBufferProvider writeBufferProvider) {
- super(cloudClient, bucket, localIoManager, Collections.emptySet(), 0, writeBufferProvider, NO_OP_REPLACER);
- }
-
- @Override
- protected void decrementNumberOfUncachedFiles() {
- // No Op
- }
-
- @Override
- protected void decrementNumberOfUncachedFiles(int count) {
- // No Op
+ public InitialCloudAccessor(ICloudClient cloudClient, String bucket, IOManager localIoManager) {
+ super(cloudClient, bucket, localIoManager, Collections.emptySet(), NO_OP_REPLACER, NoOpParallelCacher.INSTANCE);
}
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
index 19873e8..3dd579b 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -19,48 +19,39 @@
package org.apache.asterix.cloud.lazy.accessor;
import java.io.FilenameFilter;
-import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
import org.apache.asterix.cloud.CloudFileHandle;
-import org.apache.asterix.cloud.WriteBufferProvider;
import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
import org.apache.asterix.cloud.clients.ICloudClient;
-import org.apache.asterix.cloud.util.CloudFileUtil;
+import org.apache.asterix.cloud.lazy.IParallelCacher;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.control.nc.io.IOManager;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
/**
* ReplaceableCloudAccessor will be used when some (or all) of the files in the cloud storage are not cached locally.
* It will be replaced by {@link LocalAccessor} once everything is cached
*/
public class ReplaceableCloudAccessor extends AbstractLazyAccessor {
- private static final Logger LOGGER = LogManager.getLogger();
private final Set<Integer> partitions;
- private final AtomicInteger numberOfUncachedFiles;
- private final WriteBufferProvider writeBufferProvider;
private final ILazyAccessorReplacer replacer;
- private final IBulkOperationCallBack callBack;
+ private final IParallelCacher cacher;
+ private final IBulkOperationCallBack deleteCallBack;
public ReplaceableCloudAccessor(ICloudClient cloudClient, String bucket, IOManager localIoManager,
- Set<Integer> partitions, int numberOfUncachedFiles, WriteBufferProvider writeBufferProvider,
- ILazyAccessorReplacer replacer) {
+ Set<Integer> partitions, ILazyAccessorReplacer replacer, IParallelCacher cacher) {
super(cloudClient, bucket, localIoManager);
this.partitions = partitions;
- this.numberOfUncachedFiles = new AtomicInteger(numberOfUncachedFiles);
- this.writeBufferProvider = writeBufferProvider;
this.replacer = replacer;
- this.callBack = (numberOfAffectedLocalFiles, paths) -> {
- int totalUncached = paths.size() - numberOfAffectedLocalFiles;
- replaceAccessor(this.numberOfUncachedFiles.addAndGet(-totalUncached));
+ this.cacher = cacher;
+ deleteCallBack = deletedFiles -> {
+ if (cacher.remove(deletedFiles)) {
+ replace();
+ }
};
}
@@ -71,7 +62,7 @@
@Override
public IBulkOperationCallBack getBulkOperationCallBack() {
- return callBack;
+ return deleteCallBack;
}
@Override
@@ -79,25 +70,65 @@
IIOManager.FileSyncMode syncMode) throws HyracksDataException {
FileReference fileRef = fileHandle.getFileReference();
if (!localIoManager.exists(fileRef) && cloudClient.exists(bucket, fileRef.getRelativePath())) {
- // File doesn't exist locally, download it.
- ByteBuffer writeBuffer = writeBufferProvider.getBuffer();
- try {
- // TODO download for all partitions at once
- LOGGER.info("Downloading {} ..", fileRef.getRelativePath());
- CloudFileUtil.downloadFile(localIoManager, cloudClient, bucket, fileHandle, rwMode, syncMode,
- writeBuffer);
- localIoManager.close(fileHandle);
- LOGGER.info("Finished downloading {}..", fileRef.getRelativePath());
- } finally {
- writeBufferProvider.recycle(writeBuffer);
+ if (cacher.downloadData(fileRef)) {
+ replace();
}
- // TODO decrement by the number of downloaded files in all partitions (once the above TODO is fixed)
- decrementNumberOfUncachedFiles();
}
}
@Override
public Set<FileReference> doList(FileReference dir, FilenameFilter filter) throws HyracksDataException {
+ if (cacher.isCached(dir)) {
+ return localIoManager.list(dir, filter);
+ }
+ return cloudBackedList(dir, filter);
+ }
+
+ @Override
+ public boolean doExists(FileReference fileRef) throws HyracksDataException {
+ return localIoManager.exists(fileRef) || cloudClient.exists(bucket, fileRef.getRelativePath());
+ }
+
+ @Override
+ public long doGetSize(FileReference fileReference) throws HyracksDataException {
+ if (localIoManager.exists(fileReference)) {
+ return localIoManager.getSize(fileReference);
+ }
+ return cloudClient.getObjectSize(bucket, fileReference.getRelativePath());
+ }
+
+ @Override
+ public byte[] doReadAllBytes(FileReference fileRef) throws HyracksDataException {
+ if (!localIoManager.exists(fileRef) && isInNodePartition(fileRef.getRelativePath())) {
+ if (cacher.downloadMetadata(fileRef)) {
+ replace();
+ }
+ }
+ return localIoManager.readAllBytes(fileRef);
+ }
+
+ @Override
+ public void doDelete(FileReference fileReference) throws HyracksDataException {
+ // Never delete the storage dir in cloud storage
+ Set<FileReference> deletedFiles = doCloudDelete(fileReference);
+ if (cacher.remove(deletedFiles)) {
+ replace();
+ }
+ // Finally, delete locally
+ localIoManager.delete(fileReference);
+ }
+
+ @Override
+ public void doOverwrite(FileReference fileReference, byte[] bytes) throws HyracksDataException {
+ boolean existsLocally = localIoManager.exists(fileReference);
+ cloudClient.write(bucket, fileReference.getRelativePath(), bytes);
+ localIoManager.overwrite(fileReference, bytes);
+ if (!existsLocally && cacher.remove(fileReference)) {
+ replace();
+ }
+ }
+
+ private Set<FileReference> cloudBackedList(FileReference dir, FilenameFilter filter) throws HyracksDataException {
Set<String> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
if (cloudFiles.isEmpty()) {
return Collections.emptySet();
@@ -127,93 +158,12 @@
return localFiles;
}
- @Override
- public boolean doExists(FileReference fileRef) throws HyracksDataException {
- return localIoManager.exists(fileRef) || cloudClient.exists(bucket, fileRef.getRelativePath());
- }
-
- @Override
- public long doGetSize(FileReference fileReference) throws HyracksDataException {
- if (localIoManager.exists(fileReference)) {
- return localIoManager.getSize(fileReference);
- }
- return cloudClient.getObjectSize(bucket, fileReference.getRelativePath());
- }
-
- @Override
- public byte[] doReadAllBytes(FileReference fileRef) throws HyracksDataException {
- if (!localIoManager.exists(fileRef) && isInNodePartition(fileRef.getRelativePath())) {
- byte[] bytes = cloudClient.readAllBytes(bucket, fileRef.getRelativePath());
- if (bytes != null && !partitions.isEmpty()) {
- // Download the missing file for subsequent reads
- LOGGER.info("Downloading {} ..", fileRef.getRelativePath());
- localIoManager.overwrite(fileRef, bytes);
- decrementNumberOfUncachedFiles();
- }
- return bytes;
- }
- return localIoManager.readAllBytes(fileRef);
- }
-
- @Override
- public void doDelete(FileReference fileReference) throws HyracksDataException {
- // Never delete the storage dir in cloud storage
- int numberOfCloudDeletes = doCloudDelete(fileReference);
- // check local
- if (numberOfCloudDeletes > 0) {
- int numberOfLocalDeletes;
- if (numberOfCloudDeletes == 1) {
- // file delete
- numberOfLocalDeletes = localIoManager.exists(fileReference) ? 1 : 0;
- } else {
- // directory delete
- Set<String> localToBeDeleted = localIoManager.list(fileReference).stream()
- .map(FileReference::getRelativePath).collect(Collectors.toSet());
- numberOfLocalDeletes = localToBeDeleted.size();
- }
- // Decrement by number of cloud deletes that have no counterparts locally
- decrementNumberOfUncachedFiles(numberOfCloudDeletes - numberOfLocalDeletes);
- }
-
- // Finally, delete locally
- localIoManager.delete(fileReference);
- }
-
- @Override
- public void doOverwrite(FileReference fileReference, byte[] bytes) throws HyracksDataException {
- boolean existsLocally = localIoManager.exists(fileReference);
- cloudClient.write(bucket, fileReference.getRelativePath(), bytes);
- localIoManager.overwrite(fileReference, bytes);
- if (!existsLocally) {
- decrementNumberOfUncachedFiles();
- }
- }
-
- protected void decrementNumberOfUncachedFiles() {
- replaceAccessor(numberOfUncachedFiles.decrementAndGet());
- }
-
- protected void decrementNumberOfUncachedFiles(int count) {
- if (count > 0) {
- replaceAccessor(numberOfUncachedFiles.addAndGet(-count));
- }
- }
-
private boolean isInNodePartition(String path) {
return partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(path));
}
- void replaceAccessor(int remainingUncached) {
- if (remainingUncached > 0) {
- // Some files still not cached yet
- return;
- }
-
- if (remainingUncached < 0) {
- // This should not happen, log in case that happen
- LOGGER.warn("Some files were downloaded multiple times. Reported remaining uncached files = {}",
- remainingUncached);
- }
+ private void replace() {
+ cacher.close();
replacer.replace();
}
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java
index aee916c..08cbf43 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java
@@ -18,20 +18,11 @@
*/
package org.apache.asterix.cloud.util;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Set;
-import org.apache.asterix.cloud.clients.ICloudClient;
-import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IFileHandle;
-import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.control.nc.io.FileHandle;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -42,32 +33,12 @@
private CloudFileUtil() {
}
- public static void downloadFile(IOManager ioManager, ICloudClient cloudClient, String bucket, FileHandle fileHandle,
- IIOManager.FileReadWriteMode rwMode, IIOManager.FileSyncMode syncMode, ByteBuffer writeBuffer)
- throws HyracksDataException {
- FileReference fileRef = fileHandle.getFileReference();
- File file = fileRef.getFile();
-
- try (InputStream inputStream = cloudClient.getObjectStream(bucket, fileRef.getRelativePath())) {
- FileUtils.createParentDirectories(file);
- if (!file.createNewFile()) {
- throw new IllegalStateException("Couldn't create local file");
- }
-
- fileHandle.open(rwMode, syncMode);
- writeToFile(ioManager, fileHandle, inputStream, writeBuffer);
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- }
-
public static void cleanDirectoryFiles(IOManager ioManager, Set<String> cloudFiles, FileReference partitionPath)
throws HyracksDataException {
// First get the set of local files
Set<FileReference> localFiles = ioManager.list(partitionPath);
Iterator<FileReference> localFilesIter = localFiles.iterator();
- LOGGER.info("Cleaning partition {}. Total number of unchecked cloud files {}", partitionPath.getRelativePath(),
- cloudFiles.size());
+ LOGGER.info("Cleaning partition {}.", partitionPath.getRelativePath());
// Reconcile local files and cloud files
while (localFilesIter.hasNext()) {
@@ -98,39 +69,6 @@
}
}
- private static void writeToFile(IOManager ioManager, IFileHandle fileHandle, InputStream inStream,
- ByteBuffer writeBuffer) throws HyracksDataException {
- writeBuffer.clear();
- try {
- int position = 0;
- long offset = 0;
- int read;
- while ((read = inStream.read(writeBuffer.array(), position, writeBuffer.remaining())) >= 0) {
- position += read;
- writeBuffer.position(position);
- if (writeBuffer.remaining() == 0) {
- offset += writeBufferToFile(ioManager, fileHandle, writeBuffer, offset);
- position = 0;
- }
- }
-
- if (writeBuffer.position() > 0) {
- writeBufferToFile(ioManager, fileHandle, writeBuffer, offset);
- ioManager.sync(fileHandle, true);
- }
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- }
-
- private static long writeBufferToFile(IOManager ioManager, IFileHandle fileHandle, ByteBuffer writeBuffer,
- long offset) throws HyracksDataException {
- writeBuffer.flip();
- long written = ioManager.doSyncWrite(fileHandle, offset, writeBuffer);
- writeBuffer.clear();
- return written;
- }
-
private static void logDeleteFile(FileReference fileReference) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Deleting {} from the local cache as {} doesn't exist in the cloud", fileReference,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 9702b18..418e171 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -18,6 +18,9 @@
*/
package org.apache.asterix.common.utils;
+import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
+import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
+
import java.io.File;
import java.nio.file.Paths;
import java.util.Iterator;
@@ -43,6 +46,7 @@
private static final Logger LOGGER = LogManager.getLogger();
public static final char DATAVERSE_CONTINUATION_MARKER = '^';
+ private static String PARTITION_PATH = STORAGE_ROOT_DIR_NAME + File.separator + PARTITION_DIR_PREFIX;
private StoragePathUtil() {
}
@@ -71,8 +75,7 @@
}
public static String prepareStoragePartitionPath(int partitonId) {
- return Paths.get(StorageConstants.STORAGE_ROOT_DIR_NAME, StorageConstants.PARTITION_DIR_PREFIX + partitonId)
- .toString();
+ return Paths.get(StorageConstants.STORAGE_ROOT_DIR_NAME, PARTITION_DIR_PREFIX + partitonId).toString();
}
public static String prepareIngestionLogPath() {
@@ -103,8 +106,7 @@
}
public static int getPartitionNumFromRelativePath(String relativePath) {
- int startIdx = relativePath.lastIndexOf(StorageConstants.PARTITION_DIR_PREFIX)
- + StorageConstants.PARTITION_DIR_PREFIX.length();
+ int startIdx = relativePath.lastIndexOf(PARTITION_DIR_PREFIX) + PARTITION_DIR_PREFIX.length();
int partitionEndIdx = relativePath.indexOf(File.separatorChar, startIdx);
int idxEnd = partitionEndIdx != -1 ? partitionEndIdx : relativePath.length();
String partition = relativePath.substring(startIdx, idxEnd);
@@ -183,4 +185,32 @@
public static FileReference getIndexPath(IIOManager ioManager, ResourceReference ref) throws HyracksDataException {
return ioManager.resolve(ref.getRelativePath().toString());
}
+
+ /**
+ * Returns the index's path after the partition directory
+ * Example:
+ * - Input:
+ * /../storage/partition_8/dataverse_p1[/^dataverse_p2[/^dataverse_p3...]]/dataset/rebalanceCount/index/0_0_b
+ * - Output
+ * dataverse_p1[/^dataverse_p2[/^dataverse_p3...]]/dataset/rebalanceCount/index
+ *
+ * @param fileReference a file inside the index director
+ * @param isDirectory if the provided {@link FileReference} corresponds to a directory
+ * @return index path
+ */
+ public static String getIndexSubPath(FileReference fileReference, boolean isDirectory) {
+ String relativePath = fileReference.getRelativePath();
+ if (relativePath.length() <= PARTITION_PATH.length() || !relativePath.startsWith(PARTITION_PATH)) {
+ return "";
+ }
+ String partition = PARTITION_PATH + getPartitionNumFromRelativePath(relativePath);
+ int partitionStart = relativePath.indexOf(partition);
+ int start = partitionStart + partition.length() + 1;
+ int end = isDirectory ? relativePath.length() : relativePath.lastIndexOf('/');
+ if (start >= end) {
+ // This could happen if the provided path contains only a partition path (e.g., storage/partition_0)
+ return "";
+ }
+ return relativePath.substring(start, end);
+ }
}
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 161374c..4f990ea 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -91,6 +91,7 @@
<jacoco.version>0.7.6.201602180812</jacoco.version>
<log4j.version>2.19.0</log4j.version>
<awsjavasdk.version>2.20.135</awsjavasdk.version>
+ <awsjavasdk.crt.version>0.27.1</awsjavasdk.crt.version>
<parquet.version>1.12.3</parquet.version>
<hadoop-awsjavasdk.version>1.12.402</hadoop-awsjavasdk.version>
<azureblobjavasdk.version>12.22.0</azureblobjavasdk.version>
@@ -1641,6 +1642,11 @@
<artifactId>sts</artifactId>
<version>${awsjavasdk.version}</version>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk.crt</groupId>
+ <artifactId>aws-crt</artifactId>
+ <version>${awsjavasdk.crt.version}</version>
+ </dependency>
<!-- Mock for AWS S3 -->
<dependency>
<groupId>io.findify</groupId>