[ASTERIXDB-3158][HYR]: Make IOManager support writing to cloud storage
Change-Id: I362a3cbfcd3fa99f321467cb72d74d388fcdee2b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17453
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index 75f4848..c3ff70a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -19,34 +19,39 @@
package org.apache.hyracks.api.io;
import java.io.Closeable;
+import java.io.FilenameFilter;
import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
import java.util.List;
+import java.util.Set;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IIOManager extends Closeable {
- public enum FileReadWriteMode {
+
+ enum FileReadWriteMode {
READ_ONLY,
READ_WRITE
}
- public enum FileSyncMode {
+ enum FileSyncMode {
METADATA_SYNC_DATA_SYNC,
METADATA_ASYNC_DATA_SYNC,
METADATA_ASYNC_DATA_ASYNC
}
- public List<IODeviceHandle> getIODevices();
+ List<IODeviceHandle> getIODevices();
- public IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
+ IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
throws HyracksDataException;
- public int syncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
+ int syncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
- public long syncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException;
+ long syncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException;
- public int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
+ int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
IAsyncRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
@@ -54,17 +59,17 @@
IAsyncRequest asyncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
- public void close(IFileHandle fHandle) throws HyracksDataException;
+ void close(IFileHandle fHandle) throws HyracksDataException;
- public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException;
+ void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException;
- public void truncate(IFileHandle fileHandle, long size) throws HyracksDataException;
+ void truncate(IFileHandle fileHandle, long size) throws HyracksDataException;
- public long getSize(IFileHandle fileHandle);
+ long getSize(IFileHandle fileHandle);
- public WritableByteChannel newWritableChannel(IFileHandle fileHandle);
+ WritableByteChannel newWritableChannel(IFileHandle fileHandle);
- public void deleteWorkspaceFiles() throws HyracksDataException;
+ void deleteWorkspaceFiles() throws HyracksDataException;
/**
* @param ioDeviceId
@@ -85,11 +90,10 @@
/**
* Gets a file reference from an absolute path
*
- * @deprecated
- * use getFileRef(int ioDeviceId, String path) instead
* @param path
* @return A file reference based on the mounting point of {@code ioDeviceId} and the passed {@code relativePath}
* @throws HyracksDataException
+ * @deprecated use getFileRef(int ioDeviceId, String path) instead
*/
@Deprecated
FileReference resolveAbsolutePath(String path) throws HyracksDataException;
@@ -109,4 +113,31 @@
* @return the total disk usage in bytes
*/
long getTotalDiskUsage();
+
+ /**
+ * Delete any additional artifacts associated with the file reference
+ *
+ * @param fileRef
+ */
+ void delete(FileReference fileRef) throws HyracksDataException;
+
+ Set<FileReference> list(FileReference dir) throws HyracksDataException;
+
+ Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException;
+
+ void overwrite(FileReference fileRef, byte[] bytes) throws ClosedByInterruptException, HyracksDataException;
+
+ byte[] readAllBytes(FileReference fileRef) throws HyracksDataException;
+
+ void copyDirectory(FileReference srcMetadataScopePath, FileReference targetMetadataScopePath)
+ throws HyracksDataException;
+
+ void deleteDirectory(FileReference root) throws HyracksDataException;
+
+ // TODO: Remove and use list
+ Collection<FileReference> getMatchingFiles(FileReference root, FilenameFilter filter) throws HyracksDataException;
+
+ boolean exists(FileReference fileRef);
+
+ void create(FileReference fileRef) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
index ae49cb6..7644a30 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
@@ -46,6 +46,7 @@
public class IoUtil {
public static final String FILE_NOT_FOUND_MSG = "Deleting non-existing file!";
+ public static final FilenameFilter NO_OP_FILTER = (dir, name) -> true;
private static final Logger LOGGER = LogManager.getLogger();
private IoUtil() {
@@ -189,4 +190,8 @@
}
}
}
+
+ public static String getFileNameFromPath(String path) {
+ return path.substring(path.lastIndexOf('/') + 1);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index d68c291..223c71c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -95,5 +95,36 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sdk-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>regions</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>auth</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.findify</groupId>
+ <artifactId>s3mock_2.12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-http-core_2.12</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
index 57bda8b..6c74838 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
@@ -30,7 +30,7 @@
public class FileHandle implements IFileHandle {
- private final FileReference fileRef;
+ protected final FileReference fileRef;
private RandomAccessFile raf;
private String mode;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index dee61a4..984ab08 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -26,11 +26,15 @@
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
@@ -68,11 +72,11 @@
private final BlockingQueue<IoRequest> freeRequests;
private final List<IODeviceHandle> ioDevices;
private final List<IODeviceHandle> workspaces;
+ private final IFileDeviceResolver deviceComputer;
/*
* Mutables
*/
private int workspaceIndex;
- private final IFileDeviceResolver deviceComputer;
public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer, int ioParallelism, int queueSize)
throws HyracksDataException {
@@ -103,6 +107,20 @@
}
}
+ protected IOManager(IOManager ioManager, int queueSize, int ioParallelism) {
+ this.ioDevices = ioManager.ioDevices;
+ workspaces = ioManager.workspaces;
+ workspaceIndex = 0;
+ this.deviceComputer = ioManager.deviceComputer;
+ submittedRequests = new ArrayBlockingQueue<>(queueSize);
+ freeRequests = new ArrayBlockingQueue<>(queueSize);
+ int numIoThreads = ioDevices.size() * ioParallelism;
+ executor = Executors.newFixedThreadPool(numIoThreads);
+ for (int i = 0; i < numIoThreads; i++) {
+ executor.execute(new IoRequestHandler(i, submittedRequests));
+ }
+ }
+
public IoRequest getOrAllocRequest() {
IoRequest request = freeRequests.poll();
if (request == null) {
@@ -242,7 +260,7 @@
* @param offset
* @param data
* @return The number of bytes read, possibly zero, or -1 if the given offset is greater than or equal to the file's
- * current size
+ * current size
* @throws HyracksDataException
*/
@Override
@@ -473,4 +491,110 @@
}
};
}
+
+ @Override
+ public void delete(FileReference fileRef) throws HyracksDataException {
+ if (fileRef.getFile().exists()) {
+ IoUtil.delete(fileRef);
+ }
+ }
+
+ @Override
+ public Set<FileReference> list(FileReference dir) throws HyracksDataException {
+ return list(dir, IoUtil.NO_OP_FILTER);
+ }
+
+ @Override
+ public Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException {
+ /*
+ * Throws an error if this abstract pathname does not denote a directory, or if an I/O error occurs.
+ * Returns an empty set if the file does not exist, otherwise, returns the files in the specified directory
+ */
+ Set<FileReference> listedFiles = new HashSet<>();
+ if (!dir.getFile().exists()) {
+ return listedFiles;
+ }
+
+ String[] files = dir.getFile().list(filter);
+ if (files == null) {
+ if (!dir.getFile().canRead()) {
+ throw HyracksDataException.create(ErrorCode.CANNOT_READ_FILE, dir);
+ } else if (!dir.getFile().isDirectory()) {
+ throw HyracksDataException.create(ErrorCode.FILE_IS_NOT_DIRECTORY, dir);
+ }
+ throw HyracksDataException.create(ErrorCode.UNIDENTIFIED_IO_ERROR_READING_FILE, dir);
+ }
+
+ for (String file : files) {
+ listedFiles.add(dir.getChild(file));
+ }
+ return listedFiles;
+ }
+
+ @Override
+ public void overwrite(FileReference fileRef, byte[] bytes) throws ClosedByInterruptException, HyracksDataException {
+ File file = fileRef.getFile();
+ try {
+ if (file.exists()) {
+ delete(fileRef);
+ } else {
+ FileUtils.createParentDirectories(file);
+ }
+ FileUtil.writeAndForce(file.toPath(), bytes);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public byte[] readAllBytes(FileReference fileRef) throws HyracksDataException {
+ if (!fileRef.getFile().exists()) {
+ return null;
+ }
+ try {
+ return Files.readAllBytes(fileRef.getFile().toPath());
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void deleteDirectory(FileReference root) throws HyracksDataException {
+ try {
+ FileUtils.deleteDirectory(root.getFile());
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public Collection<FileReference> getMatchingFiles(FileReference root, FilenameFilter filter)
+ throws HyracksDataException {
+ Collection<File> files = IoUtil.getMatchingFiles(root.getFile().toPath(), filter);
+ Set<FileReference> fileReferences = new HashSet<>();
+ for (File file : files) {
+ fileReferences.add(resolveAbsolutePath(file.getAbsolutePath()));
+ }
+
+ return fileReferences;
+ }
+
+ @Override
+ public boolean exists(FileReference fileRef) {
+ return fileRef.getFile().exists();
+ }
+
+ @Override
+ public void create(FileReference fileRef) throws HyracksDataException {
+ IoUtil.create(fileRef);
+ }
+
+ @Override
+ public void copyDirectory(FileReference srcFileRef, FileReference destFileRef) throws HyracksDataException {
+ try {
+ FileUtils.copyDirectory(srcFileRef.getFile(), destFileRef.getFile());
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudFileHandle.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudFileHandle.java
new file mode 100644
index 0000000..c7ed9c0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudFileHandle.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud;
+
+import java.io.IOException;
+
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.FileHandle;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudBufferedWriter;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudClient;
+
+public class CloudFileHandle extends FileHandle {
+ private final CloudResettableInputStream inputStream;
+
+ public CloudFileHandle(ICloudClient cloudClient, String bucket, FileReference fileRef,
+ WriteBufferProvider bufferProvider) {
+ super(fileRef);
+ ICloudBufferedWriter bufferedWriter = cloudClient.createBufferedWriter(bucket, fileRef.getRelativePath());
+ inputStream = new CloudResettableInputStream(bufferedWriter, bufferProvider);
+ }
+
+ @Override
+ public void open(IIOManager.FileReadWriteMode rwMode, IIOManager.FileSyncMode syncMode) throws IOException {
+ if (fileRef.getFile().exists()) {
+ super.open(rwMode, syncMode);
+ }
+ }
+
+ public CloudResettableInputStream getInputStream() {
+ return inputStream;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudIOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudIOManager.java
new file mode 100644
index 0000000..f753f4f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudIOManager.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.control.nc.io.cloud.clients.CloudClientProvider;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudClient;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class CloudIOManager extends IOManager {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final ICloudClient cloudClient;
+ private final WriteBufferProvider writeBufferProvider;
+ private final String bucket;
+
+ // TODO(htowaileb): temporary, will need to be read from somewhere
+ public static final String STORAGE_ROOT_DIR_NAME = "storage";
+
+ public CloudIOManager(IIOManager ioManager, int queueSize, int ioParallelism) throws HyracksDataException {
+ super((IOManager) ioManager, queueSize, ioParallelism);
+
+ // TODO(htowaileb): temporary, this needs to be provided somehow
+ try {
+ List<String> lines = FileUtils.readLines(new File("/etc/s3"), "UTF-8");
+ bucket = lines.get(0);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ cloudClient = CloudClientProvider.getClient(CloudClientProvider.ClientType.S3);
+ int numOfThreads = ioManager.getIODevices().size() * ioParallelism;
+ writeBufferProvider = new WriteBufferProvider(numOfThreads);
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ @Override
+ public long doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException {
+ long writtenBytes = super.doSyncWrite(fHandle, offset, dataArray);
+ CloudResettableInputStream inputStream = ((CloudFileHandle) fHandle).getInputStream();
+ try {
+ inputStream.write(dataArray[0], dataArray[1]);
+ } catch (HyracksDataException e) {
+ inputStream.abort();
+ throw e;
+ }
+
+ return writtenBytes;
+ }
+
+ @Override
+ public int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer dataArray) throws HyracksDataException {
+ int writtenBytes = super.doSyncWrite(fHandle, offset, dataArray);
+ CloudResettableInputStream cloudInputStream = ((CloudFileHandle) fHandle).getInputStream();
+ try {
+ cloudInputStream.write(dataArray);
+ } catch (HyracksDataException e) {
+ cloudInputStream.abort();
+ throw e;
+ }
+
+ return writtenBytes;
+ }
+
+ @Override
+ public IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
+ throws HyracksDataException {
+ CloudFileHandle fHandle = new CloudFileHandle(cloudClient, bucket, fileRef, writeBufferProvider);
+ if (!super.exists(fileRef) && cloudClient.exists(bucket, fileRef.getRelativePath())) {
+ ByteBuffer writeBuffer = writeBufferProvider.getBuffer();
+ try {
+ // TODO: We need a proper caching mechanism
+ LOGGER.info("Downloading {} from cloud storage..", fileRef.getRelativePath());
+ LocalCacheUtil.download(cloudClient, this, fHandle, rwMode, syncMode, writeBuffer);
+ super.close(fHandle);
+ LOGGER.info("Finished downloading {} from cloud storage..", fileRef.getRelativePath());
+ } finally {
+ writeBufferProvider.recycle(writeBuffer);
+ }
+ }
+
+ try {
+ fHandle.open(rwMode, syncMode);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ return fHandle;
+ }
+
+ @Override
+ public void delete(FileReference fileRef) throws HyracksDataException {
+ if (!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath()))) {
+ // Never delete the storage dir in cloud storage
+ cloudClient.deleteObject(bucket, fileRef.getRelativePath());
+ }
+ super.delete(fileRef);
+ }
+
+ @Override
+ public void close(IFileHandle fHandle) throws HyracksDataException {
+ try {
+ ((CloudFileHandle) fHandle).getInputStream().close();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ super.close(fHandle);
+ }
+
+ @Override
+ public Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException {
+ Set<String> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
+ if (cloudFiles.isEmpty()) {
+ return Collections.emptySet();
+ }
+
+ // First get the set of local files
+ Set<FileReference> localFiles = super.list(dir, filter);
+ Iterator<FileReference> localFilesIter = localFiles.iterator();
+
+ // Reconcile local files and cloud files
+ while (localFilesIter.hasNext()) {
+ FileReference file = localFilesIter.next();
+ if (file.getFile().isDirectory()) {
+ continue;
+ }
+
+ String path = file.getRelativePath();
+ if (!cloudFiles.contains(path)) {
+ // Delete local files that do not exist in cloud storage (the ground truth for valid files)
+ localFilesIter.remove();
+ super.delete(file);
+ } else {
+ // No need to re-add it in the following loop
+ cloudFiles.remove(path);
+ }
+ }
+
+ // Add the remaining files that are not stored locally (if any)
+ for (String cloudFile : cloudFiles) {
+ localFiles.add(dir.getChild(IoUtil.getFileNameFromPath(cloudFile)));
+ }
+ return new HashSet<>(localFiles);
+ }
+
+ @Override
+ public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException {
+ HyracksDataException savedEx = null;
+ if (metadata) {
+ // only finish writing if metadata == true to prevent write limiter from finishing the stream and
+ // completing the upload.
+ CloudResettableInputStream stream = ((CloudFileHandle) fileHandle).getInputStream();
+ try {
+ stream.finish();
+ } catch (HyracksDataException e) {
+ savedEx = e;
+ }
+
+ if (savedEx != null) {
+ try {
+ stream.abort();
+ } catch (HyracksDataException e) {
+ savedEx.addSuppressed(e);
+ }
+ throw savedEx;
+ }
+ }
+ // Sync only after finalizing the upload to cloud storage
+ super.sync(fileHandle, metadata);
+ }
+
+ @Override
+ public long getSize(IFileHandle fileHandle) {
+ if (fileHandle.getFileReference().getFile().exists()) {
+ // This should always provide the correct size despite what is buffered in local disk
+ return super.getSize(fileHandle);
+ }
+ return cloudClient.getObjectSize(bucket, fileHandle.getFileReference().getRelativePath());
+ }
+
+ @Override
+ public void overwrite(FileReference fileRef, byte[] bytes) throws ClosedByInterruptException, HyracksDataException {
+ super.overwrite(fileRef, bytes);
+ // Write here will overwrite the older object if exists
+ cloudClient.write(bucket, fileRef.getRelativePath(), bytes);
+ }
+
+ // TODO utilize locally stored files for reading
+ @Override
+ public int doSyncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+ if (fHandle.getFileReference().getFile().exists()) {
+ return super.doSyncRead(fHandle, offset, data);
+ }
+ return cloudClient.read(bucket, fHandle.getFileReference().getRelativePath(), offset, data);
+ }
+
+ // TODO: We need to download this too
+ @Override
+ public byte[] readAllBytes(FileReference fileRef) throws HyracksDataException {
+ if (fileRef.getFile().exists()) {
+ return super.readAllBytes(fileRef);
+ }
+ return cloudClient.readAllBytes(bucket, fileRef.getRelativePath());
+ }
+
+ @Override
+ public void deleteDirectory(FileReference fileRef) throws HyracksDataException {
+ super.deleteDirectory(fileRef);
+ if (!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath()))) {
+ // Never delete the storage dir in cloud storage
+ cloudClient.deleteObject(bucket, fileRef.getRelativePath());
+ }
+ }
+
+ @Override
+ public Collection<FileReference> getMatchingFiles(FileReference root, FilenameFilter filter) {
+ Set<String> paths = cloudClient.listObjects(bucket, root.getRelativePath(), filter);
+ List<FileReference> fileReferences = new ArrayList<>();
+ for (String path : paths) {
+ fileReferences.add(new FileReference(root.getDeviceHandle(), path));
+ }
+ return fileReferences;
+ }
+
+ @Override
+ public boolean exists(FileReference fileRef) {
+ // Check if the file exists locally first as newly created files (i.e., they are empty) are not stored in cloud storage
+ return fileRef.getFile().exists() || cloudClient.exists(bucket, fileRef.getRelativePath());
+ }
+
+ @Override
+ public void create(FileReference fileRef) throws HyracksDataException {
+ // We need to delete the local file on create as the cloud storage didn't complete the upload
+ // In other words, both cloud files and the local files are not in sync
+ super.delete(fileRef);
+ super.create(fileRef);
+ }
+
+ @Override
+ public void copyDirectory(FileReference srcFileRef, FileReference destFileRef) throws HyracksDataException {
+ cloudClient.copy(bucket, srcFileRef.getRelativePath(), destFileRef);
+ super.copyDirectory(srcFileRef, destFileRef);
+ }
+
+ protected long writeLocally(IFileHandle fHandle, long offset, ByteBuffer buffer) throws HyracksDataException {
+ return super.doSyncWrite(fHandle, offset, buffer);
+ }
+
+ protected void syncLocally(IFileHandle fileHandle) throws HyracksDataException {
+ super.sync(fileHandle, true);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudResettableInputStream.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudResettableInputStream.java
new file mode 100644
index 0000000..84af84f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/CloudResettableInputStream.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudBufferedWriter;
+
+public class CloudResettableInputStream extends InputStream {
+ // TODO: make configurable
+ public static final int MIN_BUFFER_SIZE = 5 * 1024 * 1024;
+ private final WriteBufferProvider bufferProvider;
+ private ByteBuffer writeBuffer;
+
+ private final ICloudBufferedWriter bufferedWriter;
+
+ public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter, WriteBufferProvider bufferProvider) {
+ this.bufferedWriter = bufferedWriter;
+ this.bufferProvider = bufferProvider;
+ }
+
+ private void open() {
+ if (writeBuffer == null) {
+ writeBuffer = bufferProvider.getBuffer();
+ writeBuffer.clear();
+ }
+ }
+
+ @Override
+ public void reset() {
+ writeBuffer.reset();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ @Override
+ public synchronized void mark(int readLimit) {
+ writeBuffer.mark();
+ }
+
+ public void write(ByteBuffer header, ByteBuffer page) throws HyracksDataException {
+ open();
+ write(header);
+ write(page);
+ }
+
+ public int write(ByteBuffer page) throws HyracksDataException {
+ open();
+
+ // amount to write
+ int size = page.limit();
+
+ // full buffer = upload -> write all
+ if (writeBuffer.remaining() == 0) {
+ uploadAndWait();
+ }
+
+ // write partial -> upload -> write -> upload -> ...
+ int offset = 0;
+ int pageRemaining = size;
+ while (pageRemaining > 0) {
+ // enough to write all
+ if (writeBuffer.remaining() > pageRemaining) {
+ writeBuffer.put(page.array(), offset, pageRemaining);
+ return size;
+ }
+
+ int remaining = writeBuffer.remaining();
+ writeBuffer.put(page.array(), offset, remaining);
+ pageRemaining -= remaining;
+ offset += remaining;
+ uploadAndWait();
+ }
+
+ return size;
+ }
+
+ public void finish() throws HyracksDataException {
+ open();
+ try {
+ if (writeBuffer.position() > 0) {
+ uploadAndWait();
+ }
+ bufferedWriter.finish();
+ } finally {
+ returnBuffer();
+ }
+ }
+
+ public void abort() throws HyracksDataException {
+ try {
+ bufferedWriter.abort();
+ } finally {
+ returnBuffer();
+ }
+ }
+
+ private void uploadAndWait() throws HyracksDataException {
+ writeBuffer.flip();
+ try {
+ bufferedWriter.upload(this, writeBuffer.limit());
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+
+ writeBuffer.clear();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (writeBuffer.remaining() == 0) {
+ return -1;
+ }
+
+ int length = Math.min(len, writeBuffer.remaining());
+ writeBuffer.get(b, off, length);
+ return length;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return writeBuffer.get();
+ }
+
+ private void returnBuffer() {
+ if (writeBuffer != null) {
+ bufferProvider.recycle(writeBuffer);
+ writeBuffer = null;
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/LocalCacheUtil.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/LocalCacheUtil.java
new file mode 100644
index 0000000..ada32b6
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/LocalCacheUtil.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudClient;
+import org.apache.hyracks.util.file.FileUtil;
+
+// TODO replace with a proper caching mechanism
+public class LocalCacheUtil {
+ private LocalCacheUtil() {
+
+ }
+
+ public static void writeToFile(FileReference fileRef, byte[] bytes) throws HyracksDataException {
+ try {
+ File file = fileRef.getFile();
+ FileUtils.createParentDirectories(file);
+ FileUtil.writeAndForce(file.toPath(), bytes);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ // TODO: replace with proper caching policy
+ public static void download(ICloudClient cloudClient, CloudIOManager ioManager, CloudFileHandle fileHandle,
+ IIOManager.FileReadWriteMode rwMode, IIOManager.FileSyncMode syncMode, ByteBuffer writeBuffer)
+ throws HyracksDataException {
+ FileReference fileRef = fileHandle.getFileReference();
+ // write the file locally (the call to open is synchronized hence only one thread can perform this call)
+ try (InputStream inStream = cloudClient.getObjectStream(ioManager.getBucket(), fileRef.getRelativePath())) {
+ File file = fileRef.getFile();
+ FileUtils.createParentDirectories(fileRef.getFile());
+ if (!file.createNewFile()) {
+ throw new IllegalStateException("Couldn't create local file");
+ }
+ fileHandle.open(rwMode, syncMode);
+ LocalCacheUtil.writeToFile(ioManager, fileHandle, inStream, writeBuffer);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private static void writeToFile(CloudIOManager ioManager, IFileHandle fileHandle, InputStream inStream,
+ ByteBuffer writeBuffer) throws HyracksDataException {
+ writeBuffer.clear();
+ try {
+ int position = 0;
+ long offset = 0;
+ int read;
+ while ((read = inStream.read(writeBuffer.array(), position, writeBuffer.remaining())) >= 0) {
+ position += read;
+ writeBuffer.position(position);
+ if (writeBuffer.remaining() == 0) {
+ offset += writeBufferToFile(ioManager, fileHandle, writeBuffer, offset);
+ position = 0;
+ }
+ }
+
+ if (writeBuffer.position() > 0) {
+ writeBufferToFile(ioManager, fileHandle, writeBuffer, offset);
+ ioManager.syncLocally(fileHandle);
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+
+ }
+
+ private static long writeBufferToFile(CloudIOManager ioManager, IFileHandle fileHandle, ByteBuffer writeBuffer,
+ long offset) throws HyracksDataException {
+ writeBuffer.flip();
+ long written = ioManager.writeLocally(fileHandle, offset, writeBuffer);
+ writeBuffer.clear();
+ return written;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/WriteBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/WriteBufferProvider.java
new file mode 100644
index 0000000..44adf45
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/WriteBufferProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud;
+
+import static org.apache.hyracks.control.nc.io.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+public class WriteBufferProvider {
+ private final BlockingQueue<ByteBuffer> writeBuffers;
+
+ public WriteBufferProvider(int ioParallelism) {
+ writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
+ }
+
+ public void recycle(ByteBuffer buffer) {
+ writeBuffers.offer(buffer);
+ }
+
+ public ByteBuffer getBuffer() {
+ ByteBuffer writeBuffer = writeBuffers.poll();
+ if (writeBuffer == null) {
+ return ByteBuffer.allocate(MIN_BUFFER_SIZE);
+ }
+ return writeBuffer;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/CloudClientProvider.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/CloudClientProvider.java
new file mode 100644
index 0000000..72aa566
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/CloudClientProvider.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients;
+
+import java.util.HashMap;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.io.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.hyracks.control.nc.io.cloud.clients.azure.blob.AzureBlobCloudClient;
+import org.apache.hyracks.control.nc.io.cloud.clients.gcp.gcs.GCSCloudClient;
+
+public class CloudClientProvider {
+
+ public enum ClientType {
+ NO_OP,
+ S3,
+ AZURE_BLOB,
+ GOOGLE_CLOUD_STORAGE
+ }
+
+ private CloudClientProvider() {
+ throw new AssertionError("do not instantiate");
+ }
+
+ public static ICloudClient getClient(ClientType clientType) throws HyracksDataException {
+ switch (clientType) {
+ case NO_OP:
+ return NoOpCloudClient.INSTANCE;
+ case S3:
+ // TODO: map should have the config already
+ return new S3CloudClient(new HashMap<>());
+ case AZURE_BLOB:
+ return new AzureBlobCloudClient();
+ case GOOGLE_CLOUD_STORAGE:
+ return new GCSCloudClient();
+ default:
+ throw HyracksDataException.create(new IllegalArgumentException("Unknown cloud client type"));
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/ICloudBufferedWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/ICloudBufferedWriter.java
new file mode 100644
index 0000000..8bb2d68
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/ICloudBufferedWriter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients;
+
+import java.io.InputStream;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface ICloudBufferedWriter {
+
+ /**
+ * Uploads input stream content
+ *
+ * @param stream stream
+ * @param length length
+ * @return amount uploaded
+ */
+ int upload(InputStream stream, int length);
+
+ /**
+ * Finishes the upload
+ *
+ * @throws HyracksDataException HyracksDataException
+ */
+ void finish() throws HyracksDataException;
+
+ /**
+ * Aborts the upload
+ *
+ * @throws HyracksDataException HyracksDataException
+ */
+ void abort() throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/ICloudClient.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/ICloudClient.java
new file mode 100644
index 0000000..bb12769
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/ICloudClient.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients;
+
+import java.io.FilenameFilter;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+
+/**
+ * Interface containing methods to perform IO operation on the Cloud Storage
+ */
+public interface ICloudClient {
+
+ /**
+ * Creates a cloud buffered writer
+ *
+ * @param bucket bucket to write to
+ * @param path path to write to
+ * @return buffered writer
+ */
+ ICloudBufferedWriter createBufferedWriter(String bucket, String path);
+
+ /**
+ * Lists objects at the specified bucket and path, and applies the file name filter on the returned objects
+ *
+ * @param bucket bucket to list from
+ * @param path path to list from
+ * @param filter filter to apply
+ * @return file names returned after applying the file name filter
+ */
+ Set<String> listObjects(String bucket, String path, FilenameFilter filter);
+
+ /**
+ * Performs a range-read from the specified bucket and path starting at the offset. The amount read is equal to the
+ * buffer.remaining()
+ *
+ * @param bucket bucket
+ * @param path path
+ * @param offset offset
+ * @param buffer buffer
+ * TODO(htowaileb) should this be returning the buffer position or the total amount read?
+ * @return returns the buffer position
+ */
+ int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException;
+
+ /**
+ * Reads all bytes of an object at the specified bucket and path
+ *
+ * @param bucket bucket
+ * @param path path
+ * @return bytes
+ * @throws HyracksDataException HyracksDataException
+ */
+ byte[] readAllBytes(String bucket, String path) throws HyracksDataException;
+
+ /**
+ * Returns the {@code InputStream} of an object at the specified bucket and path
+ *
+ * @param bucket bucket
+ * @param path path
+ * @return inputstream
+ */
+ InputStream getObjectStream(String bucket, String path);
+
+ /**
+ * Writes the content of the byte array into the bucket at the specified path
+ *
+ * @param bucket bucket
+ * @param path path
+ * @param data data
+ */
+ void write(String bucket, String path, byte[] data);
+
+ /**
+ * Copies an object from the source path to the destination path
+ *
+ * @param bucket bucket
+ * @param srcPath source path
+ * @param destPath destination path
+ */
+ void copy(String bucket, String srcPath, FileReference destPath);
+
+ /**
+ * Deletes an object at the specified bucket and path
+ *
+ * @param bucket bucket
+ * @param path path
+ */
+ void deleteObject(String bucket, String path);
+
+ /**
+ * Returns the size of the object at the specified path
+ *
+ * @param bucket bucket
+ * @param path path
+ * @return size
+ */
+ long getObjectSize(String bucket, String path);
+
+ /**
+ * Checks if an object exists at the specified path
+ *
+ * @param bucket bucket
+ * @param path path
+ * @return {@code true} if the object exists, {@code false} otherwise
+ */
+ boolean exists(String bucket, String path);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/NoOpCloudClient.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/NoOpCloudClient.java
new file mode 100644
index 0000000..2751181
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/NoOpCloudClient.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients;
+
+import java.io.FilenameFilter;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+
+public class NoOpCloudClient implements ICloudClient {
+
+ public static final NoOpCloudClient INSTANCE = new NoOpCloudClient();
+
+ private NoOpCloudClient() {
+ // do not instantiate
+ }
+
+ @Override
+ public ICloudBufferedWriter createBufferedWriter(String bucket, String path) {
+ return null;
+ }
+
+ @Override
+ public Set<String> listObjects(String bucket, String path, FilenameFilter filter) {
+ return null;
+ }
+
+ @Override
+ public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
+ return 0;
+ }
+
+ @Override
+ public byte[] readAllBytes(String bucket, String path) throws HyracksDataException {
+ return new byte[0];
+ }
+
+ @Override
+ public InputStream getObjectStream(String bucket, String path) {
+ return null;
+ }
+
+ @Override
+ public void write(String bucket, String path, byte[] data) {
+
+ }
+
+ @Override
+ public void copy(String bucket, String srcPath, FileReference destPath) {
+
+ }
+
+ @Override
+ public void deleteObject(String bucket, String path) {
+
+ }
+
+ @Override
+ public long getObjectSize(String bucket, String path) {
+ return 0;
+ }
+
+ @Override
+ public boolean exists(String bucket, String path) {
+ return false;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3BufferedWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3BufferedWriter.java
new file mode 100644
index 0000000..a793f40
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3BufferedWriter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients.aws.s3;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudBufferedWriter;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+
+public class S3BufferedWriter implements ICloudBufferedWriter {
+ private final List<CompletedPart> partQueue;
+ private final String path;
+ private final S3Client s3Client;
+ private final String bucket;
+ private String uploadId;
+ private int partNumber;
+ private static final int MAX_RETRIES = 3;
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ public S3BufferedWriter(S3Client s3client, String bucket, String path) {
+ this.s3Client = s3client;
+ this.bucket = bucket;
+ this.path = path;
+ partQueue = new ArrayList<>();
+ }
+
+ @Override
+ public int upload(InputStream stream, int length) {
+ setUploadId();
+ UploadPartRequest upReq =
+ UploadPartRequest.builder().uploadId(uploadId).partNumber(partNumber).bucket(bucket).key(path).build();
+ String etag = s3Client.uploadPart(upReq, RequestBody.fromInputStream(stream, length)).eTag();
+ partQueue.add(CompletedPart.builder().partNumber(partNumber).eTag(etag).build());
+
+ return partNumber++;
+ }
+
+ @Override
+ public void finish() throws HyracksDataException {
+ CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder().parts(partQueue).build();
+ CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder()
+ .bucket(bucket).key(path).uploadId(uploadId).multipartUpload(completedMultipartUpload).build();
+ int retries = 0;
+ while (true) {
+ try {
+ completeMultipartUpload(completeMultipartUploadRequest);
+ break;
+ } catch (Exception e) {
+ retries++;
+ if (retries == MAX_RETRIES) {
+ throw HyracksDataException.create(e);
+ }
+ LOGGER.info(() -> "S3 storage write retry, encountered: " + e.getMessage());
+
+ // Backoff for 1 sec for the first 2 retries, and 2 seconds from there onward
+ try {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(retries < 2 ? 1 : 2));
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(ex);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ s3Client.abortMultipartUpload(
+ AbortMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).build());
+ }
+
+ private void completeMultipartUpload(CompleteMultipartUploadRequest request) throws HyracksDataException {
+ try {
+ s3Client.completeMultipartUpload(request);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private void setUploadId() {
+ if (uploadId == null) {
+ CreateMultipartUploadRequest uploadRequest =
+ CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build();
+ CreateMultipartUploadResponse uploadResp = s3Client.createMultipartUpload(uploadRequest);
+ uploadId = uploadResp.uploadId();
+ partNumber = 1;
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3CloudClient.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3CloudClient.java
new file mode 100644
index 0000000..af1441e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3CloudClient.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients.aws.s3;
+
+import static org.apache.hyracks.control.nc.io.cloud.clients.aws.s3.S3Utils.listS3Objects;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudBufferedWriter;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudClient;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
+import software.amazon.awssdk.services.s3.model.Delete;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class S3CloudClient implements ICloudClient {
+
+ private static final String ACCESS_KEY_ID_FIELD = "accessKeyId";
+ private static final String SECRET_ACCESS_KEY_FIELD = "secretAccessKey";
+ private static final String REGION_FIELD = "region";
+ private final static String ENDPOINT_FIELD = "endpoint";
+
+ private final S3Client s3Client;
+
+ // TODO fix the throws exception
+ public S3CloudClient(Map<String, String> clientConfiguration) throws HyracksDataException {
+ setClientConfig(clientConfiguration); // TODO: remove later, this is temporary
+ s3Client = buildClient(clientConfiguration);
+ }
+
+ private S3Client buildClient(Map<String, String> clientConfiguration) throws HyracksDataException {
+ String accessKeyId = clientConfiguration.get(ACCESS_KEY_ID_FIELD);
+ String secretAccessKey = clientConfiguration.get(SECRET_ACCESS_KEY_FIELD);
+ String region = clientConfiguration.get(REGION_FIELD);
+ String endpoint = clientConfiguration.get(ENDPOINT_FIELD);
+
+ AwsCredentialsProvider credentialsProvider =
+ StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey));
+ S3ClientBuilder builder = S3Client.builder();
+ builder.credentialsProvider(credentialsProvider);
+ builder.region(Region.of(region));
+
+ if (endpoint != null) {
+ try {
+ URI uri = new URI(endpoint);
+ builder.endpointOverride(uri);
+ } catch (Exception ex) {
+ throw HyracksDataException.create(ex);
+ }
+ }
+ return builder.build();
+ }
+
+ // TODO: temporarily setting the client config, this should be provided
+ private void setClientConfig(Map<String, String> clientConfiguration) throws HyracksDataException {
+ if (!clientConfiguration.isEmpty()) {
+ return;
+ }
+
+ try {
+ List<String> lines = FileUtils.readLines(new File("/etc/s3"), "UTF-8");
+ String accessKeyId = lines.get(1);
+ String secretAccessKey = lines.get(2);
+ String region = lines.get(3);
+
+ clientConfiguration.put(ACCESS_KEY_ID_FIELD, accessKeyId);
+ clientConfiguration.put(SECRET_ACCESS_KEY_FIELD, secretAccessKey);
+ clientConfiguration.put(REGION_FIELD, region);
+
+ if (lines.size() > 4) {
+ String serviceEndpoint = lines.get(4);
+ clientConfiguration.put(ENDPOINT_FIELD, serviceEndpoint);
+ }
+ } catch (IOException ex) {
+ throw HyracksDataException.create(ex);
+ }
+ }
+
+ @Override
+ public ICloudBufferedWriter createBufferedWriter(String bucket, String path) {
+ return new S3BufferedWriter(s3Client, bucket, path);
+ }
+
+ @Override
+ public Set<String> listObjects(String bucket, String path, FilenameFilter filter) {
+ return filterAndGet(listS3Objects(s3Client, bucket, path), filter);
+ }
+
+ @Override
+ public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
+ long readTo = offset + buffer.remaining();
+ GetObjectRequest rangeGetObjectRequest =
+ GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo).bucket(bucket).key(path).build();
+
+ int totalRead = 0;
+ int read = 0;
+
+ // TODO(htowaileb): add retry logic here
+ try (ResponseInputStream<GetObjectResponse> response = s3Client.getObject(rangeGetObjectRequest)) {
+ while (buffer.remaining() > 0) {
+ read = response.read(buffer.array(), buffer.position(), buffer.remaining());
+ buffer.position(buffer.position() + read);
+ totalRead += read;
+ }
+ } catch (IOException ex) {
+ throw HyracksDataException.create(ex);
+ }
+
+ if (buffer.remaining() != 0) {
+ throw new IllegalStateException("Expected buffer remaining = 0, found: " + buffer.remaining());
+ }
+ return totalRead;
+ }
+
+ @Override
+ public byte[] readAllBytes(String bucket, String path) throws HyracksDataException {
+ GetObjectRequest getReq = GetObjectRequest.builder().bucket(bucket).key(path).build();
+ try {
+ ResponseInputStream<GetObjectResponse> stream = s3Client.getObject(getReq);
+ return stream.readAllBytes();
+ } catch (NoSuchKeyException e) {
+ return null;
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public InputStream getObjectStream(String bucket, String path) {
+ GetObjectRequest getReq = GetObjectRequest.builder().bucket(bucket).key(path).build();
+ try {
+ return s3Client.getObject(getReq);
+ } catch (NoSuchKeyException e) {
+ // This should not happen at least from the only caller of this method
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void write(String bucket, String path, byte[] data) {
+ PutObjectRequest putReq = PutObjectRequest.builder().bucket(bucket).key(path).build();
+
+ // TODO(htowaileb): add retry logic here
+ s3Client.putObject(putReq, RequestBody.fromBytes(data));
+ }
+
+ @Override
+ public void copy(String bucket, String srcPath, FileReference destPath) {
+ List<S3Object> objects = listS3Objects(s3Client, bucket, srcPath);
+ for (S3Object object : objects) {
+ String srcKey = object.key();
+ String destKey = destPath.getChildPath(IoUtil.getFileNameFromPath(srcKey));
+ CopyObjectRequest copyReq = CopyObjectRequest.builder().sourceBucket(bucket).sourceKey(srcKey)
+ .destinationBucket(bucket).destinationKey(destKey).build();
+ s3Client.copyObject(copyReq);
+ }
+ }
+
+ @Override
+ public void deleteObject(String bucket, String path) {
+ Set<String> fileList = listObjects(bucket, path, IoUtil.NO_OP_FILTER);
+ if (fileList.isEmpty()) {
+ return;
+ }
+
+ List<ObjectIdentifier> objectIdentifiers = new ArrayList<>();
+ for (String file : fileList) {
+ objectIdentifiers.add(ObjectIdentifier.builder().key(file).build());
+ }
+ Delete delete = Delete.builder().objects(objectIdentifiers).build();
+ DeleteObjectsRequest deleteReq = DeleteObjectsRequest.builder().bucket(bucket).delete(delete).build();
+ s3Client.deleteObjects(deleteReq);
+ }
+
+ @Override
+ public long getObjectSize(String bucket, String path) {
+ List<S3Object> objects = listS3Objects(s3Client, bucket, path);
+ if (objects.isEmpty()) {
+ return 0;
+ }
+ return objects.get(0).size();
+ }
+
+ @Override
+ public boolean exists(String bucket, String path) {
+ List<S3Object> objects = listS3Objects(s3Client, bucket, path);
+ return !objects.isEmpty();
+ }
+
+ private Set<String> filterAndGet(List<S3Object> contents, FilenameFilter filter) {
+ Set<String> files = new HashSet<>();
+ for (S3Object s3Object : contents) {
+ String path = s3Object.key();
+ if (filter.accept(null, IoUtil.getFileNameFromPath(path))) {
+ files.add(path);
+ }
+ }
+ return files;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3Utils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3Utils.java
new file mode 100644
index 0000000..e29b611
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/aws/s3/S3Utils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients.aws.s3;
+
+import java.util.List;
+
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class S3Utils {
+
+ private S3Utils() {
+ throw new AssertionError("do not instantiate");
+ }
+
+ public static List<S3Object> listS3Objects(S3Client s3Client, String bucket, String path) {
+ String newMarker = null;
+
+ ListObjectsV2Response listObjectsResponse;
+ ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(bucket);
+ listObjectsBuilder.prefix(path);
+
+ while (true) {
+ // List the objects from the start, or from the last marker in case of truncated result
+ if (newMarker == null) {
+ listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build());
+ } else {
+ listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
+ }
+
+ // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
+ if (Boolean.FALSE.equals(listObjectsResponse.isTruncated())) {
+ break;
+ } else {
+ newMarker = listObjectsResponse.nextContinuationToken();
+ }
+ }
+ return listObjectsResponse.contents();
+ }
+
+ // TODO(htowaileb): Test few runs with default client and see if any failures are encountered
+ // private static SdkHttpClient buildHttpClient() {
+ // ApacheHttpClient.Builder apacheClientBuilder = ApacheHttpClient.builder();
+ //
+ // AttributeMap.Builder overriddenConfigBuilder = AttributeMap.builder();
+ // overriddenConfigBuilder.put(SdkHttpConfigurationOption.MAX_CONNECTIONS, 128);
+ // overriddenConfigBuilder.put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, Duration.ofMinutes(60));
+ // overriddenConfigBuilder.put(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT, Duration.ofMinutes(60));
+ // overriddenConfigBuilder.put(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT, Duration.ofMinutes(60));
+ // overriddenConfigBuilder.put(SdkHttpConfigurationOption.READ_TIMEOUT, Duration.ofMinutes(60));
+ // overriddenConfigBuilder.put(SdkHttpConfigurationOption.WRITE_TIMEOUT, Duration.ofMinutes(60));
+ // overriddenConfigBuilder.put(SdkHttpConfigurationOption.TCP_KEEPALIVE, Boolean.TRUE);
+ // AttributeMap configuration = overriddenConfigBuilder.build();
+ //
+ // return apacheClientBuilder.buildWithDefaults(configuration);
+ // }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/azure/blob/AzureBlobCloudClient.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/azure/blob/AzureBlobCloudClient.java
new file mode 100644
index 0000000..6dfe05a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/azure/blob/AzureBlobCloudClient.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients.azure.blob;
+
+import java.io.FilenameFilter;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudBufferedWriter;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudClient;
+
+public class AzureBlobCloudClient implements ICloudClient {
+
+ public AzureBlobCloudClient() {
+ throw new IllegalStateException("NYI");
+ }
+
+ @Override
+ public ICloudBufferedWriter createBufferedWriter(String bucket, String path) {
+ return null;
+ }
+
+ @Override
+ public Set<String> listObjects(String bucket, String path, FilenameFilter filter) {
+ return null;
+ }
+
+ @Override
+ public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
+ return 0;
+ }
+
+ @Override
+ public byte[] readAllBytes(String bucket, String path) throws HyracksDataException {
+ return new byte[0];
+ }
+
+ @Override
+ public InputStream getObjectStream(String bucket, String path) {
+ return null;
+ }
+
+ @Override
+ public void write(String bucket, String path, byte[] data) {
+
+ }
+
+ @Override
+ public void copy(String bucket, String srcPath, FileReference destPath) {
+
+ }
+
+ @Override
+ public void deleteObject(String bucket, String path) {
+
+ }
+
+ @Override
+ public long getObjectSize(String bucket, String path) {
+ return 0;
+ }
+
+ @Override
+ public boolean exists(String bucket, String path) {
+ return false;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/gcp/gcs/GCSCloudClient.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/gcp/gcs/GCSCloudClient.java
new file mode 100644
index 0000000..a9a6f06
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/cloud/clients/gcp/gcs/GCSCloudClient.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.cloud.clients.gcp.gcs;
+
+import java.io.FilenameFilter;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudBufferedWriter;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudClient;
+
+public class GCSCloudClient implements ICloudClient {
+
+ public GCSCloudClient() {
+ throw new IllegalStateException("NYI");
+ }
+
+ @Override
+ public ICloudBufferedWriter createBufferedWriter(String bucket, String path) {
+ return null;
+ }
+
+ @Override
+ public Set<String> listObjects(String bucket, String path, FilenameFilter filter) {
+ return null;
+ }
+
+ @Override
+ public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
+ return 0;
+ }
+
+ @Override
+ public byte[] readAllBytes(String bucket, String path) throws HyracksDataException {
+ return new byte[0];
+ }
+
+ @Override
+ public InputStream getObjectStream(String bucket, String path) {
+ return null;
+ }
+
+ @Override
+ public void write(String bucket, String path, byte[] data) {
+
+ }
+
+ @Override
+ public void copy(String bucket, String srcPath, FileReference destPath) {
+
+ }
+
+ @Override
+ public void deleteObject(String bucket, String path) {
+
+ }
+
+ @Override
+ public long getObjectSize(String bucket, String path) {
+ return 0;
+ }
+
+ @Override
+ public boolean exists(String bucket, String path) {
+ return false;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/test/java/org/apache/hyracks/control/nc/lsm/LSMTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/test/java/org/apache/hyracks/control/nc/lsm/LSMTest.java
new file mode 100644
index 0000000..d62242f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/test/java/org/apache/hyracks/control/nc/lsm/LSMTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.lsm;
+
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.control.nc.io.cloud.CloudResettableInputStream;
+import org.apache.hyracks.control.nc.io.cloud.WriteBufferProvider;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudBufferedWriter;
+import org.apache.hyracks.control.nc.io.cloud.clients.ICloudClient;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public abstract class LSMTest {
+ public static final Logger LOGGER = LogManager.getLogger();
+
+ public static final String BTREE_SUFFIX = "b";
+ public static final String PLAYGROUND_CONTAINER = "playground";
+ private final static String BUCKET_STORAGE_ROOT = "storage";
+ private static final int BUFFER_SIZE = 136 * 1024 + 5;
+
+ public static ICloudClient CLOUD_CLIENT;
+
+ @Test
+ public void a4deleteTest() {
+ try {
+ CLOUD_CLIENT.deleteObject(PLAYGROUND_CONTAINER, BUCKET_STORAGE_ROOT);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+
+ @Test
+ public void a1writeToS3Test() throws IOException {
+ CloudResettableInputStream stream = null;
+
+ try {
+ ICloudBufferedWriter s3BufferedWriter =
+ CLOUD_CLIENT.createBufferedWriter(PLAYGROUND_CONTAINER, BUCKET_STORAGE_ROOT + "/0_b");
+ stream = new CloudResettableInputStream(s3BufferedWriter, new WriteBufferProvider(1));
+ ByteBuffer content = createContent(BUFFER_SIZE);
+ int size = 0;
+ for (int i = 0; i < 10; i++) {
+ content.clear();
+ size += stream.write(content);
+ }
+ stream.finish();
+ System.err.println(size);
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (stream != null) {
+ stream.abort();
+ }
+ } finally {
+ if (stream != null) {
+ stream.close();
+ }
+ }
+ }
+
+ @Test
+ public void a3readFromS3Test() {
+ try {
+ ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+ buffer.clear();
+
+ long offset = BUFFER_SIZE * 4;
+ int read = CLOUD_CLIENT.read(PLAYGROUND_CONTAINER, BUCKET_STORAGE_ROOT + "/0_b", offset, buffer);
+ buffer.clear();
+
+ for (int i = 0; i < read; i++) {
+ assert i % 127 == buffer.get();
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+
+ @Test
+ public void a2listTest() {
+ try {
+ FilenameFilter btreeFilter = (dir, name) -> !name.startsWith(".") && name.endsWith(BTREE_SUFFIX);
+ System.err.println((CLOUD_CLIENT.listObjects(PLAYGROUND_CONTAINER, BUCKET_STORAGE_ROOT, btreeFilter)));
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+
+ private ByteBuffer createContent(int size) {
+ byte[] contentArray = new byte[size];
+ for (int i = 0; i < size; i++) {
+ contentArray[i] = (byte) (i % 127);
+ }
+ return ByteBuffer.wrap(contentArray);
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/test/java/org/apache/hyracks/control/nc/lsm/aws/s3/LSMS3Test.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/test/java/org/apache/hyracks/control/nc/lsm/aws/s3/LSMS3Test.java
new file mode 100644
index 0000000..e9d0689
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/test/java/org/apache/hyracks/control/nc/lsm/aws/s3/LSMS3Test.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.lsm.aws.s3;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.control.nc.io.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.hyracks.control.nc.lsm.LSMTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import io.findify.s3mock.S3Mock;
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+
+public class LSMS3Test extends LSMTest {
+
+ private static S3Client client;
+ private static S3Mock s3MockServer;
+ private static final int MOCK_SERVER_PORT = 8001;
+ private static final String MOCK_SERVER_HOSTNAME = "http://localhost:" + MOCK_SERVER_PORT;
+ private static final String MOCK_SERVER_REGION = "us-west-2"; // does not matter the value
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ LOGGER.info("LSMS3Test setup");
+ LOGGER.info("Starting S3 mock server");
+ s3MockServer = new S3Mock.Builder().withPort(MOCK_SERVER_PORT).withInMemoryBackend().build();
+ s3MockServer.start();
+ LOGGER.info("S3 mock server started successfully");
+
+ // Create a client and add some files to the S3 mock server
+ LOGGER.info("Creating S3 client to load initial files to S3 mock server");
+ S3ClientBuilder builder = S3Client.builder();
+ URI endpoint = URI.create(MOCK_SERVER_HOSTNAME); // endpoint pointing to S3 mock server
+ builder.region(Region.of(MOCK_SERVER_REGION)).credentialsProvider(AnonymousCredentialsProvider.create())
+ .endpointOverride(endpoint);
+ client = builder.build();
+ cleanup();
+ client.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build());
+ LOGGER.info("Client created successfully");
+
+ Map<String, String> clientConfiguration = new HashMap<>();
+ clientConfiguration.put("accessKeyId", "randomValue");
+ clientConfiguration.put("secretAccessKey", "randomValue");
+ clientConfiguration.put("region", "randomValue");
+ clientConfiguration.put("endpoint", MOCK_SERVER_HOSTNAME);
+ CLOUD_CLIENT = new S3CloudClient(clientConfiguration);
+ }
+
+ private static void cleanup() {
+ try {
+ client.deleteBucket(DeleteBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build());
+ } catch (Exception ex) {
+ // ignore
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ // Shutting down S3 mock server
+ LOGGER.info("Shutting down S3 mock server and client");
+ if (client != null) {
+ client.close();
+ }
+ if (s3MockServer != null) {
+ s3MockServer.shutdown();
+ }
+ LOGGER.info("S3 mock down and client shut down successfully");
+ }
+}
diff --git a/hyracks-fullstack/pom.xml b/hyracks-fullstack/pom.xml
index 348b5bf..c1b8bfc 100644
--- a/hyracks-fullstack/pom.xml
+++ b/hyracks-fullstack/pom.xml
@@ -77,6 +77,7 @@
<jackson.version>2.14.1</jackson.version>
<jackson-databind.version>${jackson.version}</jackson-databind.version>
<netty.version>4.1.87.Final</netty.version>
+ <awsjavasdk.version>2.17.218</awsjavasdk.version>
<implementation.title>Apache Hyracks and Algebricks - ${project.name}</implementation.title>
<implementation.url>https://asterixdb.apache.org/</implementation.url>
@@ -480,6 +481,38 @@
<artifactId>jetty-util-ajax</artifactId>
<version>9.4.48.v20220622</version>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sdk-core</artifactId>
+ <version>${awsjavasdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3</artifactId>
+ <version>${awsjavasdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>regions</artifactId>
+ <version>${awsjavasdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>auth</artifactId>
+ <version>${awsjavasdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.findify</groupId>
+ <artifactId>s3mock_2.12</artifactId>
+ <version>0.2.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-http-core_2.12</artifactId>
+ <version>10.1.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
<build>