[ASTERIXDB-3387][STO]: Prepare asterix-cloud for unlimited storage
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Prepare asterix-cloud for supporting 'Selective' accessor,
where a file can have holes or can be evicted entirely.
Change-Id: I6653bbc780bb33e900170a65b4a7334c1c97a80e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18248
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-cloud/pom.xml b/asterixdb/asterix-cloud/pom.xml
index 36cb039..659a6d7 100644
--- a/asterixdb/asterix-cloud/pom.xml
+++ b/asterixdb/asterix-cloud/pom.xml
@@ -137,6 +137,11 @@
<dependencies>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-cloud</artifactId>
+ <version>${hyracks.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.asterix</groupId>
<artifactId>asterix-common</artifactId>
<version>${project.version}</version>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 7ab8a5a..8cf5ac5 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -24,6 +24,7 @@
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
@@ -33,6 +34,7 @@
import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
import org.apache.asterix.cloud.bulk.NoOpDeleteBulkCallBack;
import org.apache.asterix.cloud.clients.CloudClientProvider;
+import org.apache.asterix.cloud.clients.CloudFile;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.clients.ICloudWriter;
import org.apache.asterix.cloud.util.CloudFileUtil;
@@ -47,6 +49,7 @@
import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.io.IIOBulkOperation;
import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -54,7 +57,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-public abstract class AbstractCloudIOManager extends IOManager implements IPartitionBootstrapper {
+public abstract class AbstractCloudIOManager extends IOManager implements IPartitionBootstrapper, ICloudIOManager {
private static final Logger LOGGER = LogManager.getLogger();
protected final ICloudClient cloudClient;
protected final IWriteBufferProvider writeBufferProvider;
@@ -86,8 +89,8 @@
@Override
public IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint() {
- Set<String> existingMetadataFiles = getCloudMetadataPartitionFiles();
- String bootstrapMarkerPath = StoragePathUtil.getBootstrapMarkerRelativePath(nsPathResolver);
+ Set<CloudFile> existingMetadataFiles = getCloudMetadataPartitionFiles();
+ CloudFile bootstrapMarkerPath = CloudFile.of(StoragePathUtil.getBootstrapMarkerRelativePath(nsPathResolver));
if (existingMetadataFiles.isEmpty() || existingMetadataFiles.contains(bootstrapMarkerPath)) {
LOGGER.info("First time to initialize this cluster: systemState = PERMANENT_DATA_LOSS");
return IRecoveryManager.SystemState.PERMANENT_DATA_LOSS;
@@ -122,6 +125,7 @@
deleteUnkeptPartitionDirs(currentOnDiskPartitions);
cleanupLocalFiles();
}
+
// Has different implementations depending on the caching policy
downloadPartitions(metadataNode, metadataPartition);
}
@@ -138,7 +142,7 @@
}
private void cleanupLocalFiles() throws HyracksDataException {
- Set<String> cloudFiles = cloudClient.listObjects(bucket, STORAGE_ROOT_DIR_NAME, IoUtil.NO_OP_FILTER);
+ Set<CloudFile> cloudFiles = cloudClient.listObjects(bucket, STORAGE_ROOT_DIR_NAME, IoUtil.NO_OP_FILTER);
if (cloudFiles.isEmpty()) {
LOGGER.warn("No files in the cloud. Deleting all local files in partitions {}...", partitions);
for (FileReference partitionPath : partitionPaths) {
@@ -159,6 +163,53 @@
/*
* ******************************************************************
+ * ICloudIOManager functions
+ * ******************************************************************
+ */
+
+ @Override
+ public final void cloudRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+ cloudClient.read(bucket, fHandle.getFileReference().getRelativePath(), offset, data);
+ }
+
+ @Override
+ public final InputStream cloudRead(IFileHandle fHandle, long offset, long length) {
+ return cloudClient.getObjectStream(bucket, fHandle.getFileReference().getRelativePath(), offset, length);
+ }
+
+ @Override
+ public final int localWriter(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+ return localIoManager.doSyncWrite(fHandle, offset, data);
+ }
+
+ @Override
+ public final int cloudWrite(IFileHandle fHandle, ByteBuffer data) throws HyracksDataException {
+ ICloudWriter cloudWriter = ((CloudFileHandle) fHandle).getCloudWriter();
+ int writtenBytes;
+ try {
+ writtenBytes = cloudWriter.write(data);
+ } catch (HyracksDataException e) {
+ cloudWriter.abort();
+ throw e;
+ }
+ return writtenBytes;
+ }
+
+ @Override
+ public final long cloudWrite(IFileHandle fHandle, ByteBuffer[] data) throws HyracksDataException {
+ ICloudWriter cloudWriter = ((CloudFileHandle) fHandle).getCloudWriter();
+ int writtenBytes;
+ try {
+ writtenBytes = cloudWriter.write(data[0], data[1]);
+ } catch (HyracksDataException e) {
+ cloudWriter.abort();
+ throw e;
+ }
+ return writtenBytes;
+ }
+
+ /*
+ * ******************************************************************
* IIOManager functions
* ******************************************************************
*/
@@ -166,7 +217,7 @@
@Override
public final IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
throws HyracksDataException {
- ICloudWriter cloudWriter = cloudClient.createdWriter(bucket, fileRef.getRelativePath(), writeBufferProvider);
+ ICloudWriter cloudWriter = cloudClient.createWriter(bucket, fileRef.getRelativePath(), writeBufferProvider);
CloudFileHandle fHandle = new CloudFileHandle(fileRef, cloudWriter);
onOpen(fHandle);
try {
@@ -188,26 +239,14 @@
public final long doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray)
throws HyracksDataException {
long writtenBytes = localIoManager.doSyncWrite(fHandle, offset, dataArray);
- ICloudWriter cloudWriter = ((CloudFileHandle) fHandle).getCloudWriter();
- try {
- cloudWriter.write(dataArray[0], dataArray[1]);
- } catch (HyracksDataException e) {
- cloudWriter.abort();
- throw e;
- }
+ cloudWrite(fHandle, dataArray);
return writtenBytes;
}
@Override
public final int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer dataArray) throws HyracksDataException {
int writtenBytes = localIoManager.doSyncWrite(fHandle, offset, dataArray);
- ICloudWriter cloudWriter = ((CloudFileHandle) fHandle).getCloudWriter();
- try {
- cloudWriter.write(dataArray);
- } catch (HyracksDataException e) {
- cloudWriter.abort();
- throw e;
- }
+ cloudWrite(fHandle, dataArray);
return writtenBytes;
}
@@ -294,24 +333,24 @@
cloudClient.write(bucket, key, bytes);
}
- private Set<String> getCloudMetadataPartitionFiles() {
+ private Set<CloudFile> getCloudMetadataPartitionFiles() {
String metadataNamespacePath = StoragePathUtil.getNamespacePath(nsPathResolver,
MetadataConstants.METADATA_NAMESPACE, METADATA_PARTITION);
return cloudClient.listObjects(bucket, metadataNamespacePath, IoUtil.NO_OP_FILTER);
}
private void ensureCompleteMetadataBootstrap() throws HyracksDataException {
- Set<String> metadataPartitionFiles = getCloudMetadataPartitionFiles();
- boolean foundBootstrapMarker =
- metadataPartitionFiles.contains(StoragePathUtil.getBootstrapMarkerRelativePath(nsPathResolver));
+ Set<CloudFile> metadataPartitionFiles = getCloudMetadataPartitionFiles();
+ CloudFile marker = CloudFile.of(StoragePathUtil.getBootstrapMarkerRelativePath(nsPathResolver));
+ boolean foundBootstrapMarker = metadataPartitionFiles.contains(marker);
// if the bootstrap file exists, we failed to bootstrap --> delete all partial files in metadata partition
if (foundBootstrapMarker) {
LOGGER.info(
"detected failed bootstrap attempted, deleting all existing files in the metadata partition: {}",
metadataPartitionFiles);
IIOBulkOperation deleteBulkOperation = createDeleteBulkOperation();
- for (String file : metadataPartitionFiles) {
- deleteBulkOperation.add(resolve(file));
+ for (CloudFile file : metadataPartitionFiles) {
+ deleteBulkOperation.add(resolve(file.getPath()));
}
performBulkOperation(deleteBulkOperation);
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 0b4200c..04979e6 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.config.CloudProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.logging.log4j.LogManager;
@@ -94,4 +95,14 @@
cloudClient.write(bucket, fileRef.getRelativePath(), bytes);
localIoManager.overwrite(fileRef, bytes);
}
+
+ @Override
+ public int punchHole(IFileHandle fHandle, long offset, long length) {
+ throw new UnsupportedOperationException("punchHole is not supported with Eager caching");
+ }
+
+ @Override
+ public void evict(FileReference directory) {
+ throw new UnsupportedOperationException("evict is not supported with Eager caching");
+ }
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java
index 693b73a..6a0a023 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java
@@ -20,8 +20,19 @@
import java.nio.ByteBuffer;
+/**
+ * Buffer provider and recycler
+ */
public interface IWriteBufferProvider {
+ /**
+ * @return a buffer
+ */
ByteBuffer getBuffer();
+ /**
+ * Return the buffer to reused
+ *
+ * @param buffer to recycle
+ */
void recycle(ByteBuffer buffer);
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index cb47d00..fa4cd56 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.cloud;
-import static org.apache.asterix.cloud.lazy.ParallelCacher.METADATA_FILTER;
+import static org.apache.asterix.cloud.util.CloudFileUtil.METADATA_FILTER;
import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
@@ -32,6 +32,7 @@
import java.util.stream.Collectors;
import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
+import org.apache.asterix.cloud.clients.CloudFile;
import org.apache.asterix.cloud.clients.IParallelDownloader;
import org.apache.asterix.cloud.lazy.ParallelCacher;
import org.apache.asterix.cloud.lazy.accessor.ILazyAccessor;
@@ -44,6 +45,7 @@
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.io.IIOBulkOperation;
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.api.util.IoUtil;
@@ -85,17 +87,17 @@
protected synchronized void downloadPartitions(boolean metadataNode, int metadataPartition)
throws HyracksDataException {
// Get the files in all relevant partitions from the cloud
- Set<String> cloudFiles = cloudClient.listObjects(bucket, STORAGE_ROOT_DIR_NAME, IoUtil.NO_OP_FILTER).stream()
- .filter(f -> partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(f)))
+ Set<CloudFile> cloudFiles = cloudClient.listObjects(bucket, STORAGE_ROOT_DIR_NAME, IoUtil.NO_OP_FILTER).stream()
+ .filter(f -> partitions.contains(StoragePathUtil.getPartitionNumFromRelativePath(f.getPath())))
.collect(Collectors.toSet());
// Get all files stored locally
- Set<String> localFiles = new HashSet<>();
+ Set<CloudFile> localFiles = new HashSet<>();
for (IODeviceHandle deviceHandle : getIODevices()) {
FileReference storageRoot = deviceHandle.createFileRef(STORAGE_ROOT_DIR_NAME);
Set<FileReference> deviceFiles = localIoManager.list(storageRoot, IoUtil.NO_OP_FILTER);
for (FileReference fileReference : deviceFiles) {
- localFiles.add(fileReference.getRelativePath());
+ localFiles.add(CloudFile.of(fileReference.getRelativePath()));
}
}
@@ -113,7 +115,7 @@
// Download all metadata files to avoid (List) calls to the cloud when listing/reading these files
downloadMetadataFiles(downloader, uncachedFiles);
// Create a parallel cacher which download and monitor all uncached files
- ParallelCacher cacher = new ParallelCacher(downloader, uncachedFiles);
+ ParallelCacher cacher = new ParallelCacher(downloader, uncachedFiles, true);
// Local cache misses some files, cloud-based accessor is needed for read operations
accessor = new ReplaceableCloudAccessor(cloudClient, bucket, localIoManager, partitions, replacer, cacher);
} else {
@@ -183,14 +185,31 @@
log("WRITE", fileRef);
}
- private List<FileReference> resolve(Set<String> cloudFiles) throws HyracksDataException {
+ @Override
+ public int punchHole(IFileHandle fileHandle, long offset, long length) throws HyracksDataException {
+ // TODO implement for Selective accessor
+ return -1;
+ }
+
+ @Override
+ public void evict(FileReference directory) throws HyracksDataException {
+ // TODO implement for Selective accessor
+ }
+
+ private List<FileReference> resolve(Set<CloudFile> cloudFiles) throws HyracksDataException {
List<FileReference> fileReferences = new ArrayList<>();
- for (String file : cloudFiles) {
+ for (CloudFile file : cloudFiles) {
fileReferences.add(resolve(file));
}
return fileReferences;
}
+ private FileReference resolve(CloudFile file) throws HyracksDataException {
+ String path = file.getPath();
+ IODeviceHandle devHandle = getDeviceComputer().resolve(path, getIODevices());
+ return new UncachedFileReference(devHandle, path, file.getSize());
+ }
+
private void log(String op, FileReference fileReference) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} {}", op, fileReference.getRelativePath());
@@ -199,7 +218,7 @@
private void downloadMetadataFiles(IParallelDownloader downloader, List<FileReference> uncachedFiles)
throws HyracksDataException {
- Set<FileReference> uncachedMetadataFiles = ParallelCacher.getFiles(uncachedFiles, METADATA_FILTER);
+ Set<FileReference> uncachedMetadataFiles = ParallelCacher.getFiles(uncachedFiles, METADATA_FILTER).keySet();
if (!uncachedMetadataFiles.isEmpty()) {
LOGGER.debug("Downloading metadata files for all partitions; current uncached files: {}", uncachedFiles);
downloader.downloadFiles(uncachedMetadataFiles);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/UncachedFileReference.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/UncachedFileReference.java
new file mode 100644
index 0000000..1f1891d
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/UncachedFileReference.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IODeviceHandle;
+
+public class UncachedFileReference extends FileReference {
+ private static final long serialVersionUID = -920468777697935759L;
+ private final long size;
+
+ public UncachedFileReference(IODeviceHandle dev, String path, long size) {
+ super(dev, path);
+ this.size = size;
+ }
+
+ private UncachedFileReference(FileReference fileReference) {
+ this(fileReference.getDeviceHandle(), fileReference.getRelativePath(), fileReference.getFile().length());
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ /**
+ * Convert a collection of {@link FileReference} to a collection of {@link UncachedFileReference}
+ * NOTE: Requires files to exist in the local drive
+ *
+ * @param files Collection of {@link FileReference} to convert to {@link UncachedFileReference}
+ * @return converted collection of file references
+ */
+ public static Collection<FileReference> toUncached(Collection<FileReference> files) {
+ List<FileReference> uncached = new ArrayList<>();
+ for (FileReference fileReference : files) {
+ if (!fileReference.getFile().exists()) {
+ throw new IllegalStateException(fileReference.getAbsolutePath() + " does not exist");
+ }
+ uncached.add(new UncachedFileReference(fileReference));
+ }
+
+ return uncached;
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudFile.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudFile.java
new file mode 100644
index 0000000..e1cf135
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudFile.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients;
+
+public final class CloudFile {
+ private final String path;
+ private final long size;
+
+ private CloudFile(String path, long size) {
+ this.path = path;
+ this.size = size;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ @Override
+ public int hashCode() {
+ return path.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof CloudFile)) {
+ return false;
+ }
+
+ CloudFile other = (CloudFile) obj;
+ return path.equals(other.path);
+ }
+
+ @Override
+ public String toString() {
+ return path;
+ }
+
+ public static CloudFile of(String path, long size) {
+ return new CloudFile(path, size);
+ }
+
+ public static CloudFile of(String path) {
+ return new CloudFile(path, 0);
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index 0307846..fb50dc9 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -49,7 +49,7 @@
* @param bufferProvider buffer provider
* @return cloud writer
*/
- ICloudWriter createdWriter(String bucket, String path, IWriteBufferProvider bufferProvider);
+ ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider);
/**
* Lists objects at the specified bucket and path, and applies the file name filter on the returned objects
@@ -59,7 +59,7 @@
* @param filter filter to apply
* @return file names returned after applying the file name filter
*/
- Set<String> listObjects(String bucket, String path, FilenameFilter filter);
+ Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter);
/**
* Performs a range-read from the specified bucket and path starting at the offset. The amount read is equal to the
@@ -88,9 +88,11 @@
*
* @param bucket bucket
* @param path path
- * @return inputstream
+ * @param offset offset
+ * @param length length
+ * @return input stream of requested range
*/
- InputStream getObjectStream(String bucket, String path);
+ InputStream getObjectStream(String bucket, String path, long offset, long length);
/**
* Writes the content of the byte array into the bucket at the specified path
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 9c31e17..38a6801 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -37,6 +37,7 @@
import org.apache.asterix.cloud.CloudResettableInputStream;
import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.cloud.clients.CloudFile;
import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.clients.ICloudWriter;
@@ -98,13 +99,13 @@
}
@Override
- public ICloudWriter createdWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
+ public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
ICloudBufferedWriter bufferedWriter = new S3BufferedWriter(s3Client, profiler, bucket, path);
return new CloudResettableInputStream(bufferedWriter, bufferProvider);
}
@Override
- public Set<String> listObjects(String bucket, String path, FilenameFilter filter) {
+ public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) {
profiler.objectsList();
path = config.isLocalS3Provider() ? encodeURI(path) : path;
return filterAndGet(listS3Objects(s3Client, bucket, path), filter);
@@ -152,9 +153,11 @@
}
@Override
- public InputStream getObjectStream(String bucket, String path) {
+ public InputStream getObjectStream(String bucket, String path, long offset, long length) {
profiler.objectGet();
- GetObjectRequest getReq = GetObjectRequest.builder().bucket(bucket).key(path).build();
+ long readTo = offset + length;
+ GetObjectRequest getReq =
+ GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo).bucket(bucket).key(path).build();
try {
return s3Client.getObject(getReq);
} catch (NoSuchKeyException e) {
@@ -167,8 +170,6 @@
public void write(String bucket, String path, byte[] data) {
profiler.objectWrite();
PutObjectRequest putReq = PutObjectRequest.builder().bucket(bucket).key(path).build();
-
- // TODO(htowaileb): add retry logic here
s3Client.putObject(putReq, RequestBody.fromBytes(data));
}
@@ -281,12 +282,12 @@
return builder.build();
}
- private Set<String> filterAndGet(List<S3Object> contents, FilenameFilter filter) {
- Set<String> files = new HashSet<>();
+ private Set<CloudFile> filterAndGet(List<S3Object> contents, FilenameFilter filter) {
+ Set<CloudFile> files = new HashSet<>();
for (S3Object s3Object : contents) {
String path = config.isLocalS3Provider() ? S3ClientUtils.decodeURI(s3Object.key()) : s3Object.key();
if (filter.accept(null, IoUtil.getFileNameFromPath(path))) {
- files.add(path);
+ files.add(CloudFile.of(path, s3Object.size()));
}
}
return files;
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index c725ca5..9eb3fe0 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -33,6 +33,7 @@
import java.util.Set;
import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.cloud.clients.CloudFile;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.clients.ICloudWriter;
import org.apache.asterix.cloud.clients.IParallelDownloader;
@@ -86,20 +87,20 @@
}
@Override
- public ICloudWriter createdWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
+ public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
return new GCSWriter(bucket, path, gcsClient, profiler);
}
@Override
- public Set<String> listObjects(String bucket, String path, FilenameFilter filter) {
+ public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) {
profiler.objectsList();
Page<Blob> blobs =
gcsClient.list(bucket, BlobListOption.prefix(path), BlobListOption.fields(Storage.BlobField.SIZE));
- Set<String> files = new HashSet<>();
+ Set<CloudFile> files = new HashSet<>();
for (Blob blob : blobs.iterateAll()) {
if (filter.accept(null, IoUtil.getFileNameFromPath(blob.getName()))) {
- files.add(blob.getName());
+ files.add(CloudFile.of(blob.getName(), blob.getSize()));
}
}
return files;
@@ -138,12 +139,13 @@
}
@Override
- public InputStream getObjectStream(String bucket, String path) {
+ public InputStream getObjectStream(String bucket, String path, long offset, long length) {
profiler.objectGet();
- try (ReadChannel reader = gcsClient.reader(bucket, path)) {
+ try (ReadChannel reader = gcsClient.reader(bucket, path).limit(offset + length)) {
+ reader.seek(offset);
return Channels.newInputStream(reader);
- } catch (StorageException ex) {
- throw new IllegalStateException(ex);
+ } catch (StorageException | IOException e) {
+ throw new IllegalStateException(e);
}
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
index e3d978d..aebeab2 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/IParallelCacher.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.cloud.lazy;
+import java.io.File;
import java.io.FilenameFilter;
import java.util.Collection;
import java.util.Set;
@@ -27,23 +28,51 @@
public interface IParallelCacher {
/**
- * Check whether an index file data and metadata are already cached
+ * Check whether a file is cacheable or not
*
- * @param indexDir index directory
- * @return true if the index is already cached, false otherwise
+ * @param fileReference file
+ * @return true if the file is already cached, false otherwise
*/
- boolean isCached(FileReference indexDir);
+ boolean isCacheable(FileReference fileReference);
+ /**
+ * Returns a list of all uncached files
+ *
+ * @param dir directory to list
+ * @param filter file name filter
+ * @return set of uncached files
+ */
Set<FileReference> getUncachedFiles(FileReference dir, FilenameFilter filter);
/**
+ * Returns the size of a file
+ *
+ * @param fileReference file
+ * @return the size of the file if it exists or zero otherwise (as expected when calling {@link File#length()})
+ */
+ long getSize(FileReference fileReference);
+
+ /**
+ * @return total size of uncached files
+ */
+ long getUncachedTotalSize();
+
+ /**
+ * Creates empty data files
+ *
+ * @param indexFile a file reference in an index
+ * @return true if the remaining number of uncached files is zero, false otherwise
+ */
+ boolean createEmptyDataFiles(FileReference indexFile) throws HyracksDataException;
+
+ /**
* Downloads all index's data files for all partitions.
* The index is inferred from the path of the provided file.
*
* @param indexFile a file reference in an index
* @return true if the remaining number of uncached files is zero, false otherwise
*/
- boolean downloadData(FileReference indexFile) throws HyracksDataException;
+ boolean downloadDataFiles(FileReference indexFile) throws HyracksDataException;
/**
* Downloads all index's metadata files for all partitions.
@@ -52,7 +81,7 @@
* @param indexFile a file reference in an index
* @return true if the remaining number of uncached files is zero, false otherwise
*/
- boolean downloadMetadata(FileReference indexFile) throws HyracksDataException;
+ boolean downloadMetadataFiles(FileReference indexFile) throws HyracksDataException;
/**
* Remove the deleted files from the uncached file set
@@ -63,12 +92,19 @@
boolean remove(Collection<FileReference> deletedFiles);
/**
- * Remove the deleted file from the uncached file set
+ * Remove a file from the uncached file set
*
- * @param deletedFile the deleted file
+ * @param fileReference the deleted file
* @return true if the remaining number of uncached files is zero, false otherwise
*/
- boolean remove(FileReference deletedFile);
+ boolean remove(FileReference fileReference);
+
+ /**
+ * Add files to indicated that they are not cached anymore
+ *
+ * @param files to be uncached
+ */
+ void add(Collection<FileReference> files);
/**
* Close cacher resources
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java
index a77652c..c953de5 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/NoOpParallelCacher.java
@@ -23,13 +23,14 @@
import java.util.Collections;
import java.util.Set;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
public class NoOpParallelCacher implements IParallelCacher {
public static final IParallelCacher INSTANCE = new NoOpParallelCacher();
@Override
- public boolean isCached(FileReference indexDir) {
+ public boolean isCacheable(FileReference fileReference) {
return false;
}
@@ -39,12 +40,27 @@
}
@Override
- public boolean downloadData(FileReference indexFile) {
+ public long getSize(FileReference fileReference) {
+ return 0L;
+ }
+
+ @Override
+ public long getUncachedTotalSize() {
+ return 0L;
+ }
+
+ @Override
+ public boolean createEmptyDataFiles(FileReference indexFile) throws HyracksDataException {
return false;
}
@Override
- public boolean downloadMetadata(FileReference indexFile) {
+ public boolean downloadDataFiles(FileReference indexFile) {
+ return false;
+ }
+
+ @Override
+ public boolean downloadMetadataFiles(FileReference indexFile) {
return false;
}
@@ -54,11 +70,16 @@
}
@Override
- public boolean remove(FileReference deletedFile) {
+ public boolean remove(FileReference fileReference) {
return false;
}
@Override
+ public void add(Collection<FileReference> files) {
+ // NoOp
+ }
+
+ @Override
public void close() {
// NoOp
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
index 95330d9..bd6644c 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
@@ -18,21 +18,28 @@
*/
package org.apache.asterix.cloud.lazy;
-import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER;
+import static org.apache.asterix.cloud.util.CloudFileUtil.DATA_FILTER;
+import static org.apache.asterix.cloud.util.CloudFileUtil.METADATA_FILTER;
import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import org.apache.asterix.cloud.UncachedFileReference;
import org.apache.asterix.cloud.clients.IParallelDownloader;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.IoUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -42,95 +49,108 @@
* @see org.apache.asterix.cloud.lazy.accessor.ReplaceableCloudAccessor
*/
public final class ParallelCacher implements IParallelCacher {
-
- public static final FilenameFilter METADATA_FILTER =
- ((dir, name) -> name.startsWith(StorageConstants.INDEX_NON_DATA_FILES_PREFIX));
private static final Logger LOGGER = LogManager.getLogger();
private final IParallelDownloader downloader;
/**
* Uncached Indexes subpaths
*/
private final Set<String> uncachedIndexes;
+ private final boolean checkEmpty;
/**
* All uncached data files
* Example: BTree files
*/
- private final Set<FileReference> uncachedDataFiles;
+ private final Map<FileReference, UncachedFileReference> uncachedDataFiles;
/**
* All uncached metadata files
* Example: index checkpoint files
*/
- private final Set<FileReference> uncachedMetadataFiles;
+ private final Map<FileReference, UncachedFileReference> uncachedMetadataFiles;
- public ParallelCacher(IParallelDownloader downloader, List<FileReference> uncachedFiles) {
+ public ParallelCacher(IParallelDownloader downloader, List<FileReference> uncachedFiles, boolean checkEmpty) {
this.downloader = downloader;
- uncachedDataFiles = getFiles(uncachedFiles, COMPONENT_FILES_FILTER);
- uncachedMetadataFiles = getFiles(uncachedFiles, METADATA_FILTER);
+ uncachedDataFiles = new ConcurrentHashMap<>(getFiles(uncachedFiles, DATA_FILTER));
+ uncachedMetadataFiles = new ConcurrentHashMap<>(getFiles(uncachedFiles, METADATA_FILTER));
uncachedIndexes = getUncachedIndexes(uncachedFiles);
+ this.checkEmpty = checkEmpty;
}
@Override
- public boolean isCached(FileReference indexDir) {
- String relativePath = indexDir.getRelativePath();
- if (relativePath.endsWith(StorageConstants.STORAGE_ROOT_DIR_NAME)
- || relativePath.startsWith(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME)) {
- return false;
+ public boolean isCacheable(FileReference fileReference) {
+ if (isDataFile(fileReference)) {
+ return uncachedDataFiles.containsKey(fileReference);
+ } else {
+ return uncachedMetadataFiles.containsKey(fileReference);
}
- String indexSubPath = StoragePathUtil.getIndexSubPath(indexDir, true);
- return !indexSubPath.isEmpty() && !uncachedIndexes.contains(indexSubPath);
}
@Override
public Set<FileReference> getUncachedFiles(FileReference dir, FilenameFilter filter) {
if (dir.getRelativePath().endsWith(StorageConstants.STORAGE_ROOT_DIR_NAME)) {
- return uncachedDataFiles.stream()
+ return uncachedDataFiles.keySet().stream()
.filter(f -> StoragePathUtil.hasSameStorageRoot(dir, f) && filter.accept(null, f.getName()))
.collect(Collectors.toSet());
}
- return uncachedDataFiles
+ return uncachedDataFiles.keySet()
.stream().filter(f -> StoragePathUtil.hasSameStorageRoot(dir, f)
&& StoragePathUtil.isRelativeParent(dir, f) && filter.accept(null, f.getName()))
.collect(Collectors.toSet());
}
@Override
- public synchronized boolean downloadData(FileReference indexFile) throws HyracksDataException {
- String indexSubPath = StoragePathUtil.getIndexSubPath(indexFile, false);
- Set<FileReference> toDownload = new HashSet<>();
- for (FileReference fileReference : uncachedDataFiles) {
- if (fileReference.getRelativePath().contains(indexSubPath)) {
- toDownload.add(fileReference.getParent());
- }
+ public long getSize(FileReference fileReference) {
+ UncachedFileReference uncachedFile;
+ if (isDataFile(fileReference)) {
+ uncachedFile = uncachedDataFiles.get(fileReference);
+ } else {
+ uncachedFile = uncachedMetadataFiles.get(fileReference);
}
- LOGGER.debug("Downloading data files for {} in all partitions: {}", indexSubPath, toDownload);
- Collection<FileReference> failed = downloader.downloadDirectories(toDownload);
- if (!failed.isEmpty()) {
- LOGGER.warn("Failed to download data files {}. Re-downloading: {}", indexSubPath, failed);
- downloader.downloadFiles(failed);
- }
- LOGGER.debug("Finished downloading data files for {}", indexSubPath);
+ return uncachedFile == null ? 0L : uncachedFile.getSize();
+ }
+
+ @Override
+ public long getUncachedTotalSize() {
+ return getTotalSize(uncachedMetadataFiles.values()) + getTotalSize(uncachedDataFiles.values());
+ }
+
+ @Override
+ public synchronized boolean createEmptyDataFiles(FileReference indexFile) throws HyracksDataException {
+ String indexSubPath = StoragePathUtil.getIndexSubPath(indexFile, false);
+ Set<FileReference> toCreate = getForAllPartitions(uncachedDataFiles.values(), indexSubPath);
+
+ LOGGER.debug("Creating empty data files for {} in all partitions: {}", indexSubPath, toCreate);
+ createEmptyFiles(toCreate);
+ LOGGER.debug("Finished creating data files for {}", indexSubPath);
uncachedIndexes.remove(indexSubPath);
- uncachedDataFiles.removeIf(f -> f.getRelativePath().contains(indexSubPath));
+ uncachedDataFiles.keySet().removeIf(f -> f.getRelativePath().contains(indexSubPath));
return isEmpty();
}
@Override
- public synchronized boolean downloadMetadata(FileReference indexFile) throws HyracksDataException {
+ public synchronized boolean downloadDataFiles(FileReference indexFile) throws HyracksDataException {
String indexSubPath = StoragePathUtil.getIndexSubPath(indexFile, false);
- Set<FileReference> toDownload = new HashSet<>();
- for (FileReference fileReference : uncachedMetadataFiles) {
- if (fileReference.getRelativePath().contains(indexSubPath)) {
- toDownload.add(fileReference);
- }
- }
+ Set<FileReference> toDownload = getForAllPartitions(uncachedDataFiles.values(), indexSubPath);
+
+ LOGGER.debug("Downloading data files for {} in all partitions: {}", indexSubPath, toDownload);
+ downloader.downloadFiles(toDownload);
+ LOGGER.debug("Finished downloading data files for {}", indexSubPath);
+ uncachedIndexes.remove(indexSubPath);
+ uncachedDataFiles.keySet().removeIf(f -> f.getRelativePath().contains(indexSubPath));
+ return isEmpty();
+ }
+
+ @Override
+ public synchronized boolean downloadMetadataFiles(FileReference indexFile) throws HyracksDataException {
+ String indexSubPath = StoragePathUtil.getIndexSubPath(indexFile, false);
+ Set<FileReference> toDownload = getForAllPartitions(uncachedMetadataFiles.keySet(), indexSubPath);
LOGGER.debug("Downloading metadata files for {} in all partitions: {}", indexSubPath, toDownload);
downloader.downloadFiles(toDownload);
LOGGER.debug("Finished downloading metadata files for {}", indexSubPath);
- uncachedMetadataFiles.removeAll(toDownload);
+ uncachedMetadataFiles.keySet().removeAll(toDownload);
return isEmpty();
}
@@ -145,28 +165,36 @@
}
@Override
- public boolean remove(FileReference deletedFile) {
- LOGGER.info("Deleting {}", deletedFile);
- if (COMPONENT_FILES_FILTER.accept(null, deletedFile.getName())) {
- uncachedDataFiles.remove(deletedFile);
+ public boolean remove(FileReference fileReference) {
+ LOGGER.info("Deleting {}", fileReference);
+ if (isDataFile(fileReference)) {
+ uncachedDataFiles.remove(fileReference);
} else {
- uncachedMetadataFiles.remove(deletedFile);
+ uncachedMetadataFiles.remove(fileReference);
}
return isEmpty();
}
@Override
+ public synchronized void add(Collection<FileReference> files) {
+ LOGGER.info("Uncache {}", files);
+ uncachedDataFiles.putAll(getFiles(files, DATA_FILTER));
+ uncachedMetadataFiles.putAll(getFiles(files, METADATA_FILTER));
+ }
+
+ @Override
public void close() throws HyracksDataException {
downloader.close();
LOGGER.info("Parallel cacher was closed");
}
- public static Set<FileReference> getFiles(List<FileReference> uncachedFiles, FilenameFilter filter) {
- Set<FileReference> fileReferences = ConcurrentHashMap.newKeySet();
+ public static Map<FileReference, UncachedFileReference> getFiles(Collection<FileReference> uncachedFiles,
+ FilenameFilter filter) {
+ Map<FileReference, UncachedFileReference> fileReferences = new HashMap<>();
for (FileReference fileReference : uncachedFiles) {
if (filter.accept(null, fileReference.getName())) {
- fileReferences.add(fileReference);
+ fileReferences.put(fileReference, (UncachedFileReference) fileReference);
}
}
return fileReferences;
@@ -180,9 +208,48 @@
return uncachedIndexes;
}
+ private boolean isDataFile(FileReference fileReference) {
+ return DATA_FILTER.accept(null, fileReference.getName());
+ }
+
private synchronized boolean isEmpty() {
+ if (!checkEmpty) {
+ return false;
+ }
int totalSize = uncachedDataFiles.size() + uncachedMetadataFiles.size();
LOGGER.info("Current number of uncached files {}", totalSize);
return totalSize == 0;
}
+
+ private static Set<FileReference> getForAllPartitions(Collection<? extends FileReference> uncachedFiles,
+ String indexSubPath) {
+ Set<FileReference> allFiles = new HashSet<>();
+ for (FileReference fileReference : uncachedFiles) {
+ if (fileReference.getRelativePath().contains(indexSubPath)) {
+ allFiles.add(fileReference);
+ }
+ }
+
+ return allFiles;
+ }
+
+ private static long getTotalSize(Collection<UncachedFileReference> fileReferences) {
+ long size = 0L;
+ for (UncachedFileReference uncached : fileReferences) {
+ size += uncached.getSize();
+ }
+ return size;
+ }
+
+ private static void createEmptyFiles(Set<FileReference> toCreate) throws HyracksDataException {
+ for (FileReference fileReference : toCreate) {
+ IoUtil.create(fileReference);
+ try (RandomAccessFile raf = new RandomAccessFile(fileReference.getAbsolutePath(), "rw")) {
+ raf.setLength(((UncachedFileReference) fileReference).getSize());
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
+
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
index 534ff5d..48f2ec7 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java
@@ -26,22 +26,72 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+/**
+ * An abstraction for lazy I/O operations
+ */
public interface ILazyAccessor {
+ /**
+ * @return whether this is local accessor
+ */
boolean isLocalAccessor();
+ /**
+ * @return a callback for bulk operation
+ */
IBulkOperationCallBack getBulkOperationCallBack();
+ /**
+ * Notify opening a file
+ *
+ * @param fileHandle to open
+ */
void doOnOpen(CloudFileHandle fileHandle) throws HyracksDataException;
+ /**
+ * List a directory
+ *
+ * @param dir to list
+ * @param filter filter to return only specific file
+ * @return set of all files that in a directory and satisfy the filter
+ */
Set<FileReference> doList(FileReference dir, FilenameFilter filter) throws HyracksDataException;
+ /**
+ * Checks whether a file exits
+ *
+ * @param fileRef to check
+ * @return true if exists, false otherwise
+ */
boolean doExists(FileReference fileRef) throws HyracksDataException;
+ /**
+ * Get a size of a file
+ *
+ * @param fileReference to get the size of
+ * @return size in bytes
+ */
long doGetSize(FileReference fileReference) throws HyracksDataException;
+ /**
+ * Read all bytes of a file
+ *
+ * @param fileReference to read
+ * @return read bytes
+ */
byte[] doReadAllBytes(FileReference fileReference) throws HyracksDataException;
+ /**
+ * Delete a file
+ *
+ * @param fileReference to delete
+ */
void doDelete(FileReference fileReference) throws HyracksDataException;
+ /**
+ * Overwrite a file with the provided data
+ *
+ * @param fileReference to overwrite
+ * @param bytes to be written
+ */
void doOverwrite(FileReference fileReference, byte[] bytes) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessorReplacer.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessorReplacer.java
index 3a4ff8a..b560be4 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessorReplacer.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessorReplacer.java
@@ -18,6 +18,10 @@
*/
package org.apache.asterix.cloud.lazy.accessor;
+@FunctionalInterface
public interface ILazyAccessorReplacer {
+ /**
+ * Replace the {@link ILazyAccessor}
+ */
void replace();
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
index e4e168e..c532674 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java
@@ -24,6 +24,7 @@
import org.apache.asterix.cloud.CloudFileHandle;
import org.apache.asterix.cloud.bulk.IBulkOperationCallBack;
+import org.apache.asterix.cloud.clients.CloudFile;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.lazy.IParallelCacher;
import org.apache.asterix.common.utils.StorageConstants;
@@ -42,7 +43,7 @@
private static final Logger LOGGER = LogManager.getLogger();
private final Set<Integer> partitions;
private final ILazyAccessorReplacer replacer;
- private final IParallelCacher cacher;
+ protected final IParallelCacher cacher;
private final IBulkOperationCallBack deleteCallBack;
public ReplaceableCloudAccessor(ICloudClient cloudClient, String bucket, IOManager localIoManager,
@@ -72,7 +73,7 @@
public void doOnOpen(CloudFileHandle fileHandle) throws HyracksDataException {
FileReference fileRef = fileHandle.getFileReference();
if (!localIoManager.exists(fileRef) && cloudClient.exists(bucket, fileRef.getRelativePath())) {
- if (cacher.downloadData(fileRef)) {
+ if (cacher.downloadDataFiles(fileRef)) {
replace();
}
}
@@ -89,14 +90,9 @@
return localList;
}
- private static boolean isTxnDir(FileReference dir) {
- return dir.getRelativePath().startsWith(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME)
- || dir.getName().equals(StorageConstants.GLOBAL_TXN_DIR_NAME);
- }
-
@Override
public boolean doExists(FileReference fileRef) throws HyracksDataException {
- return localIoManager.exists(fileRef) || cloudClient.exists(bucket, fileRef.getRelativePath());
+ return localIoManager.exists(fileRef) || cacher.isCacheable(fileRef);
}
@Override
@@ -104,13 +100,13 @@
if (localIoManager.exists(fileReference)) {
return localIoManager.getSize(fileReference);
}
- return cloudClient.getObjectSize(bucket, fileReference.getRelativePath());
+ return cacher.getSize(fileReference);
}
@Override
public byte[] doReadAllBytes(FileReference fileRef) throws HyracksDataException {
if (!localIoManager.exists(fileRef) && isInNodePartition(fileRef.getRelativePath())) {
- if (cacher.downloadMetadata(fileRef)) {
+ if (cacher.downloadMetadataFiles(fileRef)) {
replace();
}
}
@@ -140,7 +136,7 @@
private Set<FileReference> cloudBackedList(FileReference dir, FilenameFilter filter) throws HyracksDataException {
LOGGER.debug("CLOUD LIST: {}", dir);
- Set<String> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
+ Set<CloudFile> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
if (cloudFiles.isEmpty()) {
return Collections.emptySet();
}
@@ -150,7 +146,7 @@
// Reconcile local files and cloud files
for (FileReference file : localFiles) {
- String path = file.getRelativePath();
+ CloudFile path = CloudFile.of(file.getRelativePath());
if (!cloudFiles.contains(path)) {
throw new IllegalStateException("Local file is not clean. Offending path: " + path);
} else {
@@ -160,9 +156,9 @@
}
// Add the remaining files that are not stored locally in their designated partitions (if any)
- for (String cloudFile : cloudFiles) {
- FileReference localFile = localIoManager.resolve(cloudFile);
- if (isInNodePartition(cloudFile) && StoragePathUtil.hasSameStorageRoot(dir, localFile)) {
+ for (CloudFile cloudFile : cloudFiles) {
+ FileReference localFile = localIoManager.resolve(cloudFile.getPath());
+ if (isInNodePartition(cloudFile.getPath()) && StoragePathUtil.hasSameStorageRoot(dir, localFile)) {
localFiles.add(localFile);
}
}
@@ -177,4 +173,9 @@
cacher.close();
replacer.replace();
}
+
+ private static boolean isTxnDir(FileReference dir) {
+ return dir.getRelativePath().startsWith(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME)
+ || dir.getName().equals(StorageConstants.GLOBAL_TXN_DIR_NAME);
+ }
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java
index 08cbf43..89ea4b1 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java
@@ -18,22 +18,33 @@
*/
package org.apache.asterix.cloud.util;
+import java.io.FilenameFilter;
import java.util.Iterator;
import java.util.Set;
+import org.apache.asterix.cloud.clients.CloudFile;
+import org.apache.asterix.common.utils.StorageConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class CloudFileUtil {
private static final Logger LOGGER = LogManager.getLogger();
+ // TODO Should we consider bloomfilter and LAF files as metadata files so that they are downloaded on bootstrap?
+ public static final FilenameFilter METADATA_FILTER =
+ ((dir, name) -> name.startsWith(StorageConstants.INDEX_NON_DATA_FILES_PREFIX)
+ || name.endsWith(AbstractLSMIndexFileManager.LAF_SUFFIX)
+ || name.endsWith(AbstractLSMIndexFileManager.BLOOM_FILTER_SUFFIX));
+ public static final FilenameFilter DATA_FILTER = ((dir, name) -> !METADATA_FILTER.accept(dir, name));
+
private CloudFileUtil() {
}
- public static void cleanDirectoryFiles(IOManager ioManager, Set<String> cloudFiles, FileReference partitionPath)
+ public static void cleanDirectoryFiles(IOManager ioManager, Set<CloudFile> cloudFiles, FileReference partitionPath)
throws HyracksDataException {
// First get the set of local files
Set<FileReference> localFiles = ioManager.list(partitionPath);
@@ -47,7 +58,7 @@
continue;
}
- String path = file.getRelativePath();
+ CloudFile path = CloudFile.of(file.getRelativePath());
if (!cloudFiles.contains(path)) {
// Delete local files that do not exist in cloud storage (the ground truth for valid files)
logDeleteFile(file);
@@ -60,12 +71,13 @@
}
// Add the remaining files that are not stored locally (if any)
- for (String cloudFile : cloudFiles) {
- if (!cloudFile.contains(partitionPath.getRelativePath())) {
+ for (CloudFile cloudFile : cloudFiles) {
+ String cloudFilePath = cloudFile.getPath();
+ if (!cloudFilePath.contains(partitionPath.getRelativePath())) {
continue;
}
localFiles.add(new FileReference(partitionPath.getDeviceHandle(),
- cloudFile.substring(cloudFile.indexOf(partitionPath.getRelativePath()))));
+ cloudFilePath.substring(cloudFilePath.indexOf(partitionPath.getRelativePath()))));
}
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
index 4277800..d02b34e 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
@@ -81,7 +81,7 @@
return false;
}
- cloudWriter = cloudClient.createdWriter(bucket, fullPath, bufferProvider);
+ cloudWriter = cloudClient.createWriter(bucket, fullPath, bufferProvider);
CloudOutputStream outputStream = new CloudOutputStream(cloudWriter);
printer.newStream(outputStream);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
index 75c4ec5..d73957e 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
@@ -135,7 +135,7 @@
byte[] data = new byte[Long.BYTES];
LongPointable.setLong(data, 0, writeValue);
IWriteBufferProvider bufferProvider = new WriterSingleBufferProvider(testClient.getWriteBufferSize());
- ICloudWriter writer = testClient.createdWriter(bucket, path, bufferProvider);
+ ICloudWriter writer = testClient.createWriter(bucket, path, bufferProvider);
boolean aborted = false;
try {
writer.write(data, 0, data.length);
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/AbstractLSMTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/AbstractLSMTest.java
index 484f372..e6b5534 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/AbstractLSMTest.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/AbstractLSMTest.java
@@ -55,7 +55,7 @@
public void a1writeToS3Test() throws IOException {
IWriteBufferProvider bufferProvider = new WriterSingleBufferProvider(CLOUD_CLIENT.getWriteBufferSize());
ICloudWriter cloudWriter =
- CLOUD_CLIENT.createdWriter(PLAYGROUND_CONTAINER, BUCKET_STORAGE_ROOT + "/0_b", bufferProvider);
+ CLOUD_CLIENT.createWriter(PLAYGROUND_CONTAINER, BUCKET_STORAGE_ROOT + "/0_b", bufferProvider);
try {
ByteBuffer content = createContent(BUFFER_SIZE);
diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml
index a0e5443..cc75514 100644
--- a/asterixdb/asterix-server/pom.xml
+++ b/asterixdb/asterix-server/pom.xml
@@ -131,10 +131,18 @@
<url>http://asm.objectweb.org/license.html</url>
</override>
<override>
+ <gav>asm:asm:3.1</gav>
+ <url>https://asm.ow2.io/license.html</url>
+ </override>
+ <override>
<gav>com.thoughtworks.paranamer:paranamer:2.3</gav>
<url>https://github.com/codehaus/paranamer-git/blob/paranamer-2.3/LICENSE.txt</url>
</override>
<override>
+ <gav>com.github.jnr:jnr-posix:3.1.19</gav>
+ <url>https://www.eclipse.org/legal/epl-2.0</url>
+ </override>
+ <override>
<gav>org.codehaus.jettison:jettison:1.1</gav>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</override>
diff --git a/asterixdb/src/main/licenses/content/asm.ow2.io_license.html.txt b/asterixdb/src/main/licenses/content/asm.ow2.io_license.html.txt
new file mode 100644
index 0000000..c5aba7b
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/asm.ow2.io_license.html.txt
@@ -0,0 +1,29 @@
+Copyright (c) 2000-2011 INRIA, France Telecom
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+
+1. Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+
+3. Neither the name of the copyright holders nor the names of its
+ contributors may be used to endorse or promote products derived from
+ this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
index 69d33e8..c75e83a 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
@@ -36,13 +36,13 @@
public int write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer data)
throws HyracksDataException {
ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
- return cloudIOManager.cloudWrite(handle, offset, data);
+ return cloudIOManager.cloudWrite(handle, data);
}
@Override
public long write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer[] data)
throws HyracksDataException {
ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
- return cloudIOManager.cloudWrite(handle, offset, data);
+ return cloudIOManager.cloudWrite(handle, data);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index 430aac7..83b7629 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -62,21 +62,19 @@
* Write to cloud only
*
* @param fHandle file handle
- * @param offset starting offset
* @param data to write
* @return number of written bytes
*/
- int cloudWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
+ int cloudWrite(IFileHandle fHandle, ByteBuffer data) throws HyracksDataException;
/**
* Write to cloud only
*
* @param fHandle file handle
- * @param offset starting offset
* @param data to write
* @return number of written bytes
*/
- long cloudWrite(IFileHandle fHandle, long offset, ByteBuffer[] data) throws HyracksDataException;
+ long cloudWrite(IFileHandle fHandle, ByteBuffer[] data) throws HyracksDataException;
/**
* Punch a hole in a file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java
index 3fb682c..b9d9c10 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java
@@ -18,8 +18,6 @@
*/
package org.apache.hyracks.storage.common.compression.file;
-import java.util.Objects;
-
import org.apache.hyracks.api.compression.ICompressorDecompressor;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IODeviceHandle;
@@ -57,19 +55,6 @@
return true;
}
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof CompressedFileReference)) {
- return false;
- }
- return super.equals(o) && lafPath.equals(((CompressedFileReference) o).lafPath);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.getRelativePath(), lafPath);
- }
-
/**
* @return the relative path for LAF file
*/