[ASTERIXDB-3495] Library Cloud Storage

Details:
- Use the cloud IO manager to persist library archives
- Unzip them onto local storage as part of library deployment

Change-Id: I2c173fbdc985ea590da5315c1ce18e7610fb6af0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18774
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Ian Maxon <imaxon@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
index 74beb46..76d91cf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
@@ -20,6 +20,8 @@
 
 import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER;
 import static org.apache.asterix.common.library.LibraryDescriptor.FIELD_HASH;
+import static org.apache.asterix.external.library.ExternalLibraryManager.DESCRIPTOR_FILE_NAME;
+import static org.apache.asterix.external.library.ExternalLibraryManager.writeDescriptor;
 
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -28,6 +30,8 @@
 import java.io.PrintWriter;
 import java.net.InetAddress;
 import java.net.URI;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -50,6 +54,7 @@
 import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.common.functions.ExternalFunctionLanguage;
 import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.library.LibraryDescriptor;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.messaging.api.MessageFuture;
@@ -60,6 +65,9 @@
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.apache.hyracks.http.server.utils.HttpUtil;
@@ -223,6 +231,47 @@
         }
     }
 
+    private void writeLibToCloud(LibraryUploadData uploadData, Namespace libNamespace, String libName,
+            MessageDigest digest, ExternalFunctionLanguage language) throws IOException {
+        FileReference libDir = libraryManager.getLibraryDir(libNamespace, libName);
+        IIOManager cloudIoMgr = libraryManager.getCloudIOManager();
+        FileReference lib = libDir.getChild(ILibraryManager.LIBRARY_ARCHIVE_NAME);
+        if (!libDir.getFile().exists()) {
+            Files.createDirectories(lib.getFile().toPath().getParent());
+        }
+        Files.createFile(lib.getFile().toPath());
+        IFileHandle fh = cloudIoMgr.open(lib, IIOManager.FileReadWriteMode.READ_WRITE,
+                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+        WritableByteChannel outChannel = cloudIoMgr.newWritableChannel(fh);
+        byte[] writeBuf = new byte[4096];
+        FileReference targetDescFile = libDir.getChild(DESCRIPTOR_FILE_NAME);
+        try (OutputStream outputStream = new DigestOutputStream(Channels.newOutputStream(outChannel), digest);
+                InputStream ui = new ByteBufInputStream((uploadData.fileUpload).getByteBuf())) {
+            IOUtils.copyLarge(ui, outputStream, writeBuf);
+            outputStream.flush();
+            cloudIoMgr.sync(fh, true);
+            writeDescriptor(libraryManager, targetDescFile,
+                    new LibraryDescriptor(language, ExternalLibraryUtils.digestToHexString(digest)), true, writeBuf);
+        } finally {
+            cloudIoMgr.close(fh);
+        }
+    }
+
+    private URI cacheLibAndDistribute(LibraryUploadData uploadData, DataverseName libDv, String libName, String fileExt,
+            MessageDigest digest) throws Exception {
+        Path libraryTempFile = Files.createTempFile(workingDir, "lib_", '.' + fileExt);
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Created temporary file " + libraryTempFile + " for library " + libDv.getCanonicalForm() + "."
+                    + libName);
+        }
+        FileOutputStream libTmpOut = new FileOutputStream(libraryTempFile.toFile());
+        try (OutputStream os = new DigestOutputStream(libTmpOut, digest);
+                InputStream ui = new ByteBufInputStream((uploadData.fileUpload).getByteBuf())) {
+            IOUtils.copyLarge(ui, os);
+        }
+        return createDownloadURI(libraryTempFile);
+    }
+
     private void handleModification(IServletRequest request, IServletResponse response, LibraryOperation op) {
         HttpRequest httpRequest = request.getHttpRequest();
         Path libraryTempFile = null;
@@ -240,20 +289,16 @@
                 LibraryUploadData uploadData = decodeMultiPartLibraryOptions(requestDecoder);
                 ExternalFunctionLanguage language = uploadData.type;
                 String fileExt = FilenameUtils.getExtension(uploadData.fileUpload.getFilename());
-                libraryTempFile = Files.createTempFile(workingDir, "lib_", '.' + fileExt);
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("Created temporary file " + libraryTempFile + " for library "
-                            + libDv.getCanonicalForm() + "." + libName);
-                }
                 MessageDigest digest = MessageDigest.getInstance("MD5");
-                libTmpOut = new FileOutputStream(libraryTempFile.toFile());
-                try (OutputStream os = new DigestOutputStream(libTmpOut, digest);
-                        InputStream ui = new ByteBufInputStream((uploadData.fileUpload).getByteBuf())) {
-                    IOUtils.copyLarge(ui, os);
+                if (appCtx.isCloudDeployment()) {
+                    writeLibToCloud(uploadData, libNamespace, libName, digest, language);
+                    doCreate(libNamespace, libName, language, ExternalLibraryUtils.digestToHexString(digest), null,
+                            true, getSysAuthHeader(), requestReference, request);
+                } else {
+                    URI downloadURI = cacheLibAndDistribute(uploadData, libDv, libName, fileExt, digest);
+                    doCreate(libNamespace, libName, language, ExternalLibraryUtils.digestToHexString(digest),
+                            downloadURI, true, getSysAuthHeader(), requestReference, request);
                 }
-                URI downloadURI = createDownloadURI(libraryTempFile);
-                doCreate(libNamespace, libName, language, ExternalLibraryUtils.digestToHexString(digest), downloadURI,
-                        true, getSysAuthHeader(), requestReference, request);
             } else if (op == LibraryOperation.DELETE) {
                 //DELETE semantics imply ifExists
                 doDrop(libNamespace, libName, false, requestReference, request);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 343baf0..1f08a5f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -335,8 +335,8 @@
 
         NodeControllerService ncs = (NodeControllerService) getServiceContext().getControllerService();
         FileReference appDir =
-                ioManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getAbsolutePath());
-        libraryManager = new ExternalLibraryManager(ncs, persistedResourceRegistry, appDir, ioManager);
+                persistenceIOManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getPath());
+        libraryManager = new ExternalLibraryManager(ncs, persistedResourceRegistry, appDir, persistenceIOManager);
         libraryManager.initialize(resetStorageData);
 
         /*
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index e1b4bb0..f7a6dc1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -3753,7 +3753,9 @@
             // #. create library artifacts in NCs.
             runJob(hcc, prepareJobSpec, jobFlags);
             prepareJobSuccessful = true;
-            runJob(hcc, commitJobSpec, jobFlags);
+            if (!appCtx.isCloudDeployment()) {
+                runJob(hcc, commitJobSpec, jobFlags);
+            }
 
             // #. begin new metadataTxn
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 912ca47..e4224f2 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.cloud;
 
+import static org.apache.asterix.common.utils.StorageConstants.APPLICATION_ROOT_DIR_NAME;
 import static org.apache.asterix.common.utils.StorageConstants.METADATA_PARTITION;
 import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
 import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
@@ -28,6 +29,8 @@
 import java.nio.ByteBuffer;
 import java.nio.file.FileStore;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -40,6 +43,7 @@
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.ICloudWriter;
+import org.apache.asterix.cloud.clients.IParallelDownloader;
 import org.apache.asterix.cloud.util.CloudFileUtil;
 import org.apache.asterix.common.api.INamespacePathResolver;
 import org.apache.asterix.common.cloud.IPartitionBootstrapper;
@@ -51,6 +55,7 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.io.IIOBulkOperation;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.cloud.filesystem.PhysicalDrive;
@@ -146,6 +151,7 @@
 
         // Has different implementations depending on the caching policy
         downloadPartitions(metadataNode, metadataPartition);
+        downloadAllLibraries();
     }
 
     private void deleteUnkeptPartitionDirs(List<FileReference> currentOnDiskPartitions) throws HyracksDataException {
@@ -181,6 +187,23 @@
 
     protected abstract Set<UncachedFileReference> getUncachedFiles();
 
+    @Override
+    public void downloadLibrary(Collection<FileReference> libPath) throws HyracksDataException {
+        IParallelDownloader downloader = cloudClient.createParallelDownloader(bucket, localIoManager);
+        LOGGER.info("Downloading all files located in {}", libPath);
+        downloader.downloadDirectories(libPath);
+        LOGGER.info("Finished downloading {}", libPath);
+    }
+
+    public void downloadAllLibraries() throws HyracksDataException {
+        IParallelDownloader downloader = cloudClient.createParallelDownloader(bucket, localIoManager);
+        FileReference appDir = resolveAbsolutePath(
+                localIoManager.getWorkspacePath(0).getPath() + File.separator + APPLICATION_ROOT_DIR_NAME);
+        LOGGER.info("Downloading all libraries in + {}", appDir);
+        downloader.downloadDirectories(Collections.singletonList(appDir));
+        LOGGER.info("Finished downloading all libraries");
+    }
+
     /*
      * ******************************************************************
      * ICloudIOManager functions
@@ -495,4 +518,9 @@
     public long getTotalDiskUsage() {
         return PhysicalDrive.getUsedSpace(drivePaths);
     }
+
+    @Override
+    public IIOManager getLocalIOManager() {
+        return localIoManager;
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
index 2098060..db87c60 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
@@ -33,11 +33,14 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 
 public interface ILibraryManager {
 
+    String LIBRARY_ARCHIVE_NAME = "library_archive.zip";
+
     List<Pair<Namespace, String>> getLibraryListing() throws IOException;
 
     String getLibraryHash(Namespace namespace, String libraryName) throws IOException;
@@ -70,7 +73,12 @@
 
     void unzip(FileReference sourceFile, FileReference outputDir) throws IOException;
 
-    void writeAndForce(FileReference outputFile, InputStream dataStream, byte[] copyBuf) throws IOException;
+    void writeAndForce(FileReference outputFile, InputStream dataStream, byte[] copyBuffer, IIOManager localIoManager)
+            throws IOException;
 
     void setUploadClient(Function<ILibraryManager, CloseableHttpClient> f);
+
+    void writeShim(FileReference outputFile, byte[] copyBuf) throws IOException;
+
+    IIOManager getCloudIOManager();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index bd67b11..627f170 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.common.utils;
 
+import static org.apache.hyracks.control.common.context.ServerContext.APP_DIR_NAME;
+
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -33,6 +35,7 @@
     public static final String METADATA_TXN_NOWAL_DIR_NAME = "mtd-txn-logs";
     public static final String GLOBAL_TXN_DIR_NAME = ".";
     public static final String STORAGE_ROOT_DIR_NAME = "storage";
+    public static final String APPLICATION_ROOT_DIR_NAME = APP_DIR_NAME;
     public static final String INGESTION_LOGS_DIR_NAME = "ingestion_logs";
     public static final String PARTITION_DIR_PREFIX = "partition_";
     /**
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 300e723..de236ba 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -599,6 +599,11 @@
       <artifactId>avro-mapred</artifactId>
       <version>1.12.0</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-cloud</artifactId>
+      <version>0.3.10-SNAPSHOT</version>
+    </dependency>
   </dependencies>
   <!-- apply patch for HADOOP-17225 to workaround CVE-2019-10172 -->
   <repositories>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
index 47a1768..f9dc118 100755
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
@@ -20,7 +20,10 @@
 
 import static com.fasterxml.jackson.databind.MapperFeature.SORT_PROPERTIES_ALPHABETICALLY;
 import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.apache.asterix.external.library.PythonLibraryTCPSocketEvaluator.ENTRYPOINT;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -93,6 +96,7 @@
 import org.apache.hyracks.api.network.INetworkSecurityConfig;
 import org.apache.hyracks.api.network.INetworkSecurityManager;
 import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.ipc.impl.IPCSystem;
@@ -123,7 +127,7 @@
 
     public static final String CONTENTS_DIR_NAME = "contents";
 
-    public static final String DESCRIPTOR_FILE_NAME = "lib.json";
+    public static final String DESCRIPTOR_FILE_NAME = "desc.json";
 
     public static final String DISTRIBUTION_DIR = "dist";
 
@@ -140,13 +144,13 @@
     private final FileReference trashDir;
     private final FileReference distDir;
     private final Path trashDirPath;
-    //TODO(DB): change for database
     private final Map<Pair<Namespace, String>, ILibrary> libraries = new HashMap<>();
     private IPCSystem pythonIPC;
     private final ExternalFunctionResultRouter router;
     private final IIOManager ioManager;
     private final INamespacePathResolver namespacePathResolver;
     private final boolean sslEnabled;
+    private final boolean cloudMode;
     private Function<ILibraryManager, CloseableHttpClient> uploadClientSupp;
 
     public ExternalLibraryManager(NodeControllerService ncs, IPersistedResourceRegistry reg, FileReference appDir,
@@ -165,6 +169,7 @@
         this.sslEnabled = ncs.getConfiguration().isSslEnabled();
         this.ioManager = ioManager;
         uploadClientSupp = ExternalLibraryManager::defaultHttpClient;
+        cloudMode = ncs.getConfiguration().isCloudDeployment();
     }
 
     public void initialize(boolean resetStorageData) throws HyracksDataException {
@@ -216,6 +221,13 @@
 
     @Override
     public void start() {
+        if (cloudMode) {
+            try {
+                unzipAllLibs(baseDir);
+            } catch (IOException e) {
+                LOGGER.error("Failed to unzip all libraries", e);
+            }
+        }
     }
 
     @Override
@@ -470,17 +482,47 @@
         return outZip;
     }
 
+    private void unzipAllLibs(FileReference libDir) throws IOException {
+        byte[] copyBuf = new byte[4096];
+        Files.walkFileTree(libDir.getFile().toPath(), new SimpleFileVisitor<>() {
+            @Override
+            public FileVisitResult visitFile(Path currPath, BasicFileAttributes attrs) throws IOException {
+                if (currPath.getFileName().toString().equals(LIBRARY_ARCHIVE_NAME)) {
+                    FileReference lib = ioManager.resolveAbsolutePath(currPath.toString());
+                    FileReference content = lib.getParent().getChild(REV_1_DIR_NAME).getChild(CONTENTS_DIR_NAME);
+                    if (!content.getFile().exists()) {
+                        FileUtils.forceMkdir(content.getFile());
+                    }
+                    unzip(lib, content);
+                    writeShim(content.getChild(ENTRYPOINT), copyBuf);
+                } else if (currPath.getFileName().toString().equals(DESCRIPTOR_FILE_NAME)) {
+                    Path revDir = currPath.resolveSibling(REV_1_DIR_NAME);
+                    if (!revDir.toFile().exists()) {
+                        FileUtils.forceMkdir(revDir.toFile());
+                    }
+                    Files.copy(currPath, currPath.resolveSibling(REV_1_DIR_NAME).resolve(DESCRIPTOR_FILE_NAME),
+                            REPLACE_EXISTING);
+                }
+                return FileVisitResult.CONTINUE;
+            }
+        });
+    }
+
     @Override
     public void dropLibraryPath(FileReference fileRef) throws HyracksDataException {
-        // does not flush any directories
         try {
             Path path = fileRef.getFile().toPath();
-            Path trashPath = Files.createTempDirectory(trashDirPath, null);
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Drop (move) {} into {}", path, trashPath);
+            if (ncs.getConfiguration().isCloudDeployment()) {
+                ioManager.delete(fileRef.getChild(LIBRARY_ARCHIVE_NAME));
+                ioManager.delete(fileRef.getChild(DESCRIPTOR_FILE_NAME));
+            } else {
+                Path trashPath = Files.createTempDirectory(trashDirPath, null);
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("Drop (move) {} into {}", path, trashPath);
+                }
+                Files.move(path, trashPath, StandardCopyOption.ATOMIC_MOVE);
+                ncs.getWorkQueue().schedule(new DeleteDirectoryWork(trashPath));
             }
-            Files.move(path, trashPath, StandardCopyOption.ATOMIC_MOVE);
-            ncs.getWorkQueue().schedule(new DeleteDirectoryWork(trashPath));
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
@@ -614,6 +656,10 @@
     @Override
     public void unzip(FileReference sourceFile, FileReference outputDir) throws IOException {
         boolean logTraceEnabled = LOGGER.isTraceEnabled();
+        IIOManager localIoManager = ioManager;
+        if (ncs.getConfiguration().isCloudDeployment()) {
+            localIoManager = ((ICloudIOManager) ioManager).getLocalIOManager();
+        }
         Set<Path> newDirs = new HashSet<>();
         Path outputDirPath = outputDir.getFile().toPath().toAbsolutePath().normalize();
         try (ZipFile zipFile = new ZipFile(sourceFile.getFile())) {
@@ -635,11 +681,11 @@
                     newDirs.add(p);
                 }
                 try (InputStream in = zipFile.getInputStream(entry)) {
-                    FileReference entryOutputFileRef = ioManager.resolveAbsolutePath(entryOutputPath.toString());
+                    FileReference entryOutputFileRef = localIoManager.resolveAbsolutePath(entryOutputPath.toString());
                     if (logTraceEnabled) {
                         LOGGER.trace("Extracting file {}", entryOutputFileRef);
                     }
-                    writeAndForce(entryOutputFileRef, in, writeBuf);
+                    writeAndForce(entryOutputFileRef, in, writeBuf, localIoManager);
                 }
             }
         }
@@ -649,17 +695,18 @@
     }
 
     @Override
-    public void writeAndForce(FileReference outputFile, InputStream dataStream, byte[] copyBuffer) throws IOException {
+    public void writeAndForce(FileReference outputFile, InputStream dataStream, byte[] copyBuffer,
+            IIOManager localIoManager) throws IOException {
         outputFile.getFile().createNewFile();
-        IFileHandle fHandle = ioManager.open(outputFile, IIOManager.FileReadWriteMode.READ_WRITE,
+        IFileHandle fHandle = localIoManager.open(outputFile, IIOManager.FileReadWriteMode.READ_WRITE,
                 IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-        WritableByteChannel outChannel = ioManager.newWritableChannel(fHandle);
+        WritableByteChannel outChannel = localIoManager.newWritableChannel(fHandle);
         try (OutputStream outputStream = Channels.newOutputStream(outChannel)) {
             IOUtils.copyLarge(dataStream, outputStream, copyBuffer);
             outputStream.flush();
-            ioManager.sync(fHandle, true);
+            localIoManager.sync(fHandle, true);
         } finally {
-            ioManager.close(fHandle);
+            localIoManager.close(fHandle);
         }
     }
 
@@ -696,4 +743,33 @@
         }
     }
 
+    @Override
+    public void writeShim(FileReference outputFile, byte[] copyBuf) throws IOException {
+        InputStream is = getClass().getClassLoader().getResourceAsStream(outputFile.getFile().getName());
+        if (is == null) {
+            throw new IOException("Classpath does not contain necessary Python resources!");
+        }
+        try {
+            if (ncs.getConfiguration().isCloudDeployment()) {
+                writeAndForce(outputFile, is, copyBuf, ((ICloudIOManager) ioManager).getLocalIOManager());
+            } else {
+                writeAndForce(outputFile, is, copyBuf, ioManager);
+            }
+        } finally {
+            is.close();
+        }
+    }
+
+    @Override
+    public IIOManager getCloudIOManager() {
+        return ioManager;
+    }
+
+    public static void writeDescriptor(ILibraryManager libraryManager, FileReference descFile, LibraryDescriptor desc,
+            boolean cloud, byte[] copyBuf) throws IOException {
+        byte[] bytes = libraryManager.serializeLibraryDescriptor(desc);
+        libraryManager.writeAndForce(descFile, new ByteArrayInputStream(bytes), copyBuf,
+                libraryManager.getCloudIOManager());
+    }
+
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java
index b254836..e9c7d72 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 
@@ -61,10 +62,14 @@
 
         protected IIOManager ioManager;
 
+        protected ICloudIOManager cloudIoManager;
+
         protected ILibraryManager libraryManager;
 
         private FileReference libraryDir;
 
+        protected boolean cloudMode = false;
+
         protected AbstractLibraryNodePushable(IHyracksTaskContext ctx) {
             this.ctx = ctx;
         }
@@ -75,9 +80,13 @@
         public final void initialize() throws HyracksDataException {
             INcApplicationContext runtimeCtx =
                     (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
-            ioManager = runtimeCtx.getIoManager();
+            ioManager = runtimeCtx.getPersistenceIoManager();
             libraryManager = runtimeCtx.getLibraryManager();
             libraryDir = libraryManager.getLibraryDir(namespace, libraryName);
+            if (runtimeCtx.isCloudDeployment()) {
+                cloudMode = true;
+                cloudIoManager = (ICloudIOManager) ioManager;
+            }
             try {
                 execute();
             } catch (IOException e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
index 0c12d43..638444d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
@@ -19,22 +19,26 @@
 
 package org.apache.asterix.external.operators;
 
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.apache.asterix.common.library.ILibraryManager.LIBRARY_ARCHIVE_NAME;
+import static org.apache.asterix.external.library.ExternalLibraryManager.CONTENTS_DIR_NAME;
 import static org.apache.asterix.external.library.ExternalLibraryManager.DESCRIPTOR_FILE_NAME;
+import static org.apache.asterix.external.library.ExternalLibraryManager.REV_1_DIR_NAME;
+import static org.apache.asterix.external.library.ExternalLibraryManager.writeDescriptor;
+import static org.apache.asterix.external.library.PythonLibraryTCPSocketEvaluator.ENTRYPOINT;
 import static org.apache.hyracks.control.common.controllers.NCConfig.Option.PYTHON_USE_BUNDLED_MSGPACK;
 
-import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.security.MessageDigest;
+import java.util.Collections;
 
 import org.apache.asterix.common.functions.ExternalFunctionLanguage;
 import org.apache.asterix.common.library.LibraryDescriptor;
 import org.apache.asterix.common.metadata.Namespace;
-import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.external.util.ExternalLibraryUtils;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -48,7 +52,7 @@
 
 public class LibraryDeployPrepareOperatorDescriptor extends AbstractLibraryOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     private static final Logger LOGGER = LogManager.getLogger(LibraryDeployPrepareOperatorDescriptor.class);
 
@@ -71,13 +75,30 @@
 
             private final byte[] copyBuf = new byte[4096];
 
+            private void cloudDeploy() throws IOException {
+                FileReference libDir = getLibraryDir();
+                libDir = new FileReference(libDir.getDeviceHandle(), FilenameUtils.normalize(libDir.getRelativePath()));
+                cloudIoManager.downloadLibrary(Collections.singletonList(libDir));
+                FileReference content = libDir.getChild(REV_1_DIR_NAME).getChild(CONTENTS_DIR_NAME);
+                libraryManager.unzip(libDir.getChild(LIBRARY_ARCHIVE_NAME), content);
+                libraryManager.writeShim(content.getChild(ENTRYPOINT), copyBuf);
+                Files.copy(libDir.getChild(DESCRIPTOR_FILE_NAME).getFile().toPath(),
+                        content.getParent().getChild(DESCRIPTOR_FILE_NAME).getFile().toPath(), REPLACE_EXISTING);
+            }
+
             @Override
             protected void execute() throws IOException {
                 if (LOGGER.isInfoEnabled()) {
                     LOGGER.info("Prepare deployment of library {}.{}", namespace, libraryName);
                 }
 
-                // #. create library dir if necessary, clean 'stage' dir
+                if (libLocation == null && cloudMode) {
+                    cloudDeploy();
+                    return;
+                }
+
+                //#. create library dir if necessary, clean 'stage' dir
+
                 FileReference libDir = getLibraryDir();
                 Path libDirPath = libDir.getFile().toPath();
 
@@ -121,7 +142,7 @@
                 }
                 MessageDigest digest = libraryManager.download(targetFile, authToken, libLocation);
                 // extract from the archive
-                FileReference contentsDir = stageDir.getChild(ExternalLibraryManager.CONTENTS_DIR_NAME);
+                FileReference contentsDir = stageDir.getChild(CONTENTS_DIR_NAME);
                 mkdir(contentsDir);
 
                 if (LOGGER.isDebugEnabled()) {
@@ -147,8 +168,9 @@
                 if (LOGGER.isTraceEnabled()) {
                     LOGGER.trace("Writing library descriptor into {}", targetDescFile);
                 }
-                writeDescriptor(targetDescFile,
-                        new LibraryDescriptor(language, ExternalLibraryUtils.digestToHexString(digest)));
+                writeDescriptor(libraryManager, targetDescFile,
+                        new LibraryDescriptor(language, ExternalLibraryUtils.digestToHexString(digest)), false,
+                        copyBuf);
 
                 flushDirectory(contentsDir);
                 flushDirectory(stageDir);
@@ -158,7 +180,7 @@
                     boolean writeMsgpack) throws IOException {
                 FileReference msgpack = stageDir.getChild("msgpack.pyz");
                 if (writeMsgpack) {
-                    writeShim(msgpack);
+                    libraryManager.writeShim(msgpack, copyBuf);
                     File msgPackFolder = new File(contentsDir.getRelativePath(), "ipc");
                     FileReference msgPackFolderRef =
                             new FileReference(contentsDir.getDeviceHandle(), msgPackFolder.getPath());
@@ -166,24 +188,7 @@
                     Files.delete(msgpack.getFile().toPath());
                 }
                 libraryManager.unzip(sourceFile, contentsDir);
-                writeShim(contentsDir.getChild("entrypoint.py"));
-            }
-
-            private void writeShim(FileReference outputFile) throws IOException {
-                InputStream is = getClass().getClassLoader().getResourceAsStream(outputFile.getFile().getName());
-                if (is == null) {
-                    throw new IOException("Classpath does not contain necessary Python resources!");
-                }
-                try {
-                    libraryManager.writeAndForce(outputFile, is, copyBuf);
-                } finally {
-                    is.close();
-                }
-            }
-
-            private void writeDescriptor(FileReference descFile, LibraryDescriptor desc) throws IOException {
-                byte[] bytes = libraryManager.serializeLibraryDescriptor(desc);
-                libraryManager.writeAndForce(descFile, new ByteArrayInputStream(bytes), copyBuf);
+                libraryManager.writeShim(contentsDir.getChild(ENTRYPOINT), copyBuf);
             }
 
         };
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index 9b6a2ff..62e0829 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -19,9 +19,12 @@
 package org.apache.hyracks.cloud.io;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
 import org.apache.hyracks.cloud.io.request.ICloudRequest;
 import org.apache.hyracks.cloud.io.stream.CloudInputStream;
@@ -32,6 +35,8 @@
  * file operations in a cloud deployment.
  */
 public interface ICloudIOManager {
+    void downloadLibrary(Collection<FileReference> libPath) throws HyracksDataException;
+
     /**
      * Read from the cloud
      *
@@ -105,4 +110,6 @@
      * @param resourcePath to evict
      */
     void evict(String resourcePath) throws HyracksDataException;
+
+    IIOManager getLocalIOManager();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 960f23b..37e1477 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -382,7 +382,7 @@
     }
 
     private File getWorkspaceFolder(IODeviceHandle dev) {
-        return new File(dev.getMount(), dev.getWorkspace());
+        return Path.of(dev.getMount().getPath(), dev.getWorkspace()).normalize().toFile();
     }
 
     @Override
@@ -491,8 +491,12 @@
 
             @Override
             public int write(ByteBuffer src) throws IOException {
+                int origPos = src.position();
                 int written = IOManager.this.syncWrite(fHandle, position, src);
                 position += written;
+                if (src.position() < origPos + written) {
+                    src.position(origPos + written);
+                }
                 return written;
             }