[ASTERIXDB-3495] Library Cloud Storage
Details:
- Use the cloud IO manager to persist library archives
- Unzip them onto local storage as part of library deployment
Change-Id: I2c173fbdc985ea590da5315c1ce18e7610fb6af0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18774
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Ian Maxon <imaxon@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
index 74beb46..76d91cf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
@@ -20,6 +20,8 @@
import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER;
import static org.apache.asterix.common.library.LibraryDescriptor.FIELD_HASH;
+import static org.apache.asterix.external.library.ExternalLibraryManager.DESCRIPTOR_FILE_NAME;
+import static org.apache.asterix.external.library.ExternalLibraryManager.writeDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -28,6 +30,8 @@
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.URI;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -50,6 +54,7 @@
import org.apache.asterix.common.api.IRequestReference;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.library.LibraryDescriptor;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.messaging.api.MessageFuture;
@@ -60,6 +65,9 @@
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.utils.HttpUtil;
@@ -223,6 +231,47 @@
}
}
+ private void writeLibToCloud(LibraryUploadData uploadData, Namespace libNamespace, String libName,
+ MessageDigest digest, ExternalFunctionLanguage language) throws IOException {
+ FileReference libDir = libraryManager.getLibraryDir(libNamespace, libName);
+ IIOManager cloudIoMgr = libraryManager.getCloudIOManager();
+ FileReference lib = libDir.getChild(ILibraryManager.LIBRARY_ARCHIVE_NAME);
+ if (!libDir.getFile().exists()) {
+ Files.createDirectories(lib.getFile().toPath().getParent());
+ }
+ Files.createFile(lib.getFile().toPath());
+ IFileHandle fh = cloudIoMgr.open(lib, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ WritableByteChannel outChannel = cloudIoMgr.newWritableChannel(fh);
+ byte[] writeBuf = new byte[4096];
+ FileReference targetDescFile = libDir.getChild(DESCRIPTOR_FILE_NAME);
+ try (OutputStream outputStream = new DigestOutputStream(Channels.newOutputStream(outChannel), digest);
+ InputStream ui = new ByteBufInputStream((uploadData.fileUpload).getByteBuf())) {
+ IOUtils.copyLarge(ui, outputStream, writeBuf);
+ outputStream.flush();
+ cloudIoMgr.sync(fh, true);
+ writeDescriptor(libraryManager, targetDescFile,
+ new LibraryDescriptor(language, ExternalLibraryUtils.digestToHexString(digest)), true, writeBuf);
+ } finally {
+ cloudIoMgr.close(fh);
+ }
+ }
+
+ private URI cacheLibAndDistribute(LibraryUploadData uploadData, DataverseName libDv, String libName, String fileExt,
+ MessageDigest digest) throws Exception {
+ Path libraryTempFile = Files.createTempFile(workingDir, "lib_", '.' + fileExt);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Created temporary file " + libraryTempFile + " for library " + libDv.getCanonicalForm() + "."
+ + libName);
+ }
+ FileOutputStream libTmpOut = new FileOutputStream(libraryTempFile.toFile());
+ try (OutputStream os = new DigestOutputStream(libTmpOut, digest);
+ InputStream ui = new ByteBufInputStream((uploadData.fileUpload).getByteBuf())) {
+ IOUtils.copyLarge(ui, os);
+ }
+ return createDownloadURI(libraryTempFile);
+ }
+
private void handleModification(IServletRequest request, IServletResponse response, LibraryOperation op) {
HttpRequest httpRequest = request.getHttpRequest();
Path libraryTempFile = null;
@@ -240,20 +289,16 @@
LibraryUploadData uploadData = decodeMultiPartLibraryOptions(requestDecoder);
ExternalFunctionLanguage language = uploadData.type;
String fileExt = FilenameUtils.getExtension(uploadData.fileUpload.getFilename());
- libraryTempFile = Files.createTempFile(workingDir, "lib_", '.' + fileExt);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Created temporary file " + libraryTempFile + " for library "
- + libDv.getCanonicalForm() + "." + libName);
- }
MessageDigest digest = MessageDigest.getInstance("MD5");
- libTmpOut = new FileOutputStream(libraryTempFile.toFile());
- try (OutputStream os = new DigestOutputStream(libTmpOut, digest);
- InputStream ui = new ByteBufInputStream((uploadData.fileUpload).getByteBuf())) {
- IOUtils.copyLarge(ui, os);
+ if (appCtx.isCloudDeployment()) {
+ writeLibToCloud(uploadData, libNamespace, libName, digest, language);
+ doCreate(libNamespace, libName, language, ExternalLibraryUtils.digestToHexString(digest), null,
+ true, getSysAuthHeader(), requestReference, request);
+ } else {
+ URI downloadURI = cacheLibAndDistribute(uploadData, libDv, libName, fileExt, digest);
+ doCreate(libNamespace, libName, language, ExternalLibraryUtils.digestToHexString(digest),
+ downloadURI, true, getSysAuthHeader(), requestReference, request);
}
- URI downloadURI = createDownloadURI(libraryTempFile);
- doCreate(libNamespace, libName, language, ExternalLibraryUtils.digestToHexString(digest), downloadURI,
- true, getSysAuthHeader(), requestReference, request);
} else if (op == LibraryOperation.DELETE) {
//DELETE semantics imply ifExists
doDrop(libNamespace, libName, false, requestReference, request);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 343baf0..1f08a5f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -335,8 +335,8 @@
NodeControllerService ncs = (NodeControllerService) getServiceContext().getControllerService();
FileReference appDir =
- ioManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getAbsolutePath());
- libraryManager = new ExternalLibraryManager(ncs, persistedResourceRegistry, appDir, ioManager);
+ persistenceIOManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getPath());
+ libraryManager = new ExternalLibraryManager(ncs, persistedResourceRegistry, appDir, persistenceIOManager);
libraryManager.initialize(resetStorageData);
/*
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index e1b4bb0..f7a6dc1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -3753,7 +3753,9 @@
// #. create library artifacts in NCs.
runJob(hcc, prepareJobSpec, jobFlags);
prepareJobSuccessful = true;
- runJob(hcc, commitJobSpec, jobFlags);
+ if (!appCtx.isCloudDeployment()) {
+ runJob(hcc, commitJobSpec, jobFlags);
+ }
// #. begin new metadataTxn
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 912ca47..e4224f2 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.cloud;
+import static org.apache.asterix.common.utils.StorageConstants.APPLICATION_ROOT_DIR_NAME;
import static org.apache.asterix.common.utils.StorageConstants.METADATA_PARTITION;
import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
@@ -28,6 +29,8 @@
import java.nio.ByteBuffer;
import java.nio.file.FileStore;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -40,6 +43,7 @@
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.cloud.clients.ICloudWriter;
+import org.apache.asterix.cloud.clients.IParallelDownloader;
import org.apache.asterix.cloud.util.CloudFileUtil;
import org.apache.asterix.common.api.INamespacePathResolver;
import org.apache.asterix.common.cloud.IPartitionBootstrapper;
@@ -51,6 +55,7 @@
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.io.IIOBulkOperation;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.cloud.filesystem.PhysicalDrive;
@@ -146,6 +151,7 @@
// Has different implementations depending on the caching policy
downloadPartitions(metadataNode, metadataPartition);
+ downloadAllLibraries();
}
private void deleteUnkeptPartitionDirs(List<FileReference> currentOnDiskPartitions) throws HyracksDataException {
@@ -181,6 +187,23 @@
protected abstract Set<UncachedFileReference> getUncachedFiles();
+ @Override
+ public void downloadLibrary(Collection<FileReference> libPath) throws HyracksDataException {
+ IParallelDownloader downloader = cloudClient.createParallelDownloader(bucket, localIoManager);
+ LOGGER.info("Downloading all files located in {}", libPath);
+ downloader.downloadDirectories(libPath);
+ LOGGER.info("Finished downloading {}", libPath);
+ }
+
+ public void downloadAllLibraries() throws HyracksDataException {
+ IParallelDownloader downloader = cloudClient.createParallelDownloader(bucket, localIoManager);
+ FileReference appDir = resolveAbsolutePath(
+ localIoManager.getWorkspacePath(0).getPath() + File.separator + APPLICATION_ROOT_DIR_NAME);
+ LOGGER.info("Downloading all libraries in + {}", appDir);
+ downloader.downloadDirectories(Collections.singletonList(appDir));
+ LOGGER.info("Finished downloading all libraries");
+ }
+
/*
* ******************************************************************
* ICloudIOManager functions
@@ -495,4 +518,9 @@
public long getTotalDiskUsage() {
return PhysicalDrive.getUsedSpace(drivePaths);
}
+
+ @Override
+ public IIOManager getLocalIOManager() {
+ return localIoManager;
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
index 2098060..db87c60 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
@@ -33,11 +33,14 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.ipc.impl.IPCSystem;
public interface ILibraryManager {
+ String LIBRARY_ARCHIVE_NAME = "library_archive.zip";
+
List<Pair<Namespace, String>> getLibraryListing() throws IOException;
String getLibraryHash(Namespace namespace, String libraryName) throws IOException;
@@ -70,7 +73,12 @@
void unzip(FileReference sourceFile, FileReference outputDir) throws IOException;
- void writeAndForce(FileReference outputFile, InputStream dataStream, byte[] copyBuf) throws IOException;
+ void writeAndForce(FileReference outputFile, InputStream dataStream, byte[] copyBuffer, IIOManager localIoManager)
+ throws IOException;
void setUploadClient(Function<ILibraryManager, CloseableHttpClient> f);
+
+ void writeShim(FileReference outputFile, byte[] copyBuf) throws IOException;
+
+ IIOManager getCloudIOManager();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index bd67b11..627f170 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.common.utils;
+import static org.apache.hyracks.control.common.context.ServerContext.APP_DIR_NAME;
+
import java.util.LinkedHashMap;
import java.util.Map;
@@ -33,6 +35,7 @@
public static final String METADATA_TXN_NOWAL_DIR_NAME = "mtd-txn-logs";
public static final String GLOBAL_TXN_DIR_NAME = ".";
public static final String STORAGE_ROOT_DIR_NAME = "storage";
+ public static final String APPLICATION_ROOT_DIR_NAME = APP_DIR_NAME;
public static final String INGESTION_LOGS_DIR_NAME = "ingestion_logs";
public static final String PARTITION_DIR_PREFIX = "partition_";
/**
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 300e723..de236ba 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -599,6 +599,11 @@
<artifactId>avro-mapred</artifactId>
<version>1.12.0</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-cloud</artifactId>
+ <version>0.3.10-SNAPSHOT</version>
+ </dependency>
</dependencies>
<!-- apply patch for HADOOP-17225 to workaround CVE-2019-10172 -->
<repositories>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
index 47a1768..f9dc118 100755
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
@@ -20,7 +20,10 @@
import static com.fasterxml.jackson.databind.MapperFeature.SORT_PROPERTIES_ALPHABETICALLY;
import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.apache.asterix.external.library.PythonLibraryTCPSocketEvaluator.ENTRYPOINT;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -93,6 +96,7 @@
import org.apache.hyracks.api.network.INetworkSecurityConfig;
import org.apache.hyracks.api.network.INetworkSecurityManager;
import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.ipc.impl.IPCSystem;
@@ -123,7 +127,7 @@
public static final String CONTENTS_DIR_NAME = "contents";
- public static final String DESCRIPTOR_FILE_NAME = "lib.json";
+ public static final String DESCRIPTOR_FILE_NAME = "desc.json";
public static final String DISTRIBUTION_DIR = "dist";
@@ -140,13 +144,13 @@
private final FileReference trashDir;
private final FileReference distDir;
private final Path trashDirPath;
- //TODO(DB): change for database
private final Map<Pair<Namespace, String>, ILibrary> libraries = new HashMap<>();
private IPCSystem pythonIPC;
private final ExternalFunctionResultRouter router;
private final IIOManager ioManager;
private final INamespacePathResolver namespacePathResolver;
private final boolean sslEnabled;
+ private final boolean cloudMode;
private Function<ILibraryManager, CloseableHttpClient> uploadClientSupp;
public ExternalLibraryManager(NodeControllerService ncs, IPersistedResourceRegistry reg, FileReference appDir,
@@ -165,6 +169,7 @@
this.sslEnabled = ncs.getConfiguration().isSslEnabled();
this.ioManager = ioManager;
uploadClientSupp = ExternalLibraryManager::defaultHttpClient;
+ cloudMode = ncs.getConfiguration().isCloudDeployment();
}
public void initialize(boolean resetStorageData) throws HyracksDataException {
@@ -216,6 +221,13 @@
@Override
public void start() {
+ if (cloudMode) {
+ try {
+ unzipAllLibs(baseDir);
+ } catch (IOException e) {
+ LOGGER.error("Failed to unzip all libraries", e);
+ }
+ }
}
@Override
@@ -470,17 +482,47 @@
return outZip;
}
+ private void unzipAllLibs(FileReference libDir) throws IOException {
+ byte[] copyBuf = new byte[4096];
+ Files.walkFileTree(libDir.getFile().toPath(), new SimpleFileVisitor<>() {
+ @Override
+ public FileVisitResult visitFile(Path currPath, BasicFileAttributes attrs) throws IOException {
+ if (currPath.getFileName().toString().equals(LIBRARY_ARCHIVE_NAME)) {
+ FileReference lib = ioManager.resolveAbsolutePath(currPath.toString());
+ FileReference content = lib.getParent().getChild(REV_1_DIR_NAME).getChild(CONTENTS_DIR_NAME);
+ if (!content.getFile().exists()) {
+ FileUtils.forceMkdir(content.getFile());
+ }
+ unzip(lib, content);
+ writeShim(content.getChild(ENTRYPOINT), copyBuf);
+ } else if (currPath.getFileName().toString().equals(DESCRIPTOR_FILE_NAME)) {
+ Path revDir = currPath.resolveSibling(REV_1_DIR_NAME);
+ if (!revDir.toFile().exists()) {
+ FileUtils.forceMkdir(revDir.toFile());
+ }
+ Files.copy(currPath, currPath.resolveSibling(REV_1_DIR_NAME).resolve(DESCRIPTOR_FILE_NAME),
+ REPLACE_EXISTING);
+ }
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ }
+
@Override
public void dropLibraryPath(FileReference fileRef) throws HyracksDataException {
- // does not flush any directories
try {
Path path = fileRef.getFile().toPath();
- Path trashPath = Files.createTempDirectory(trashDirPath, null);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Drop (move) {} into {}", path, trashPath);
+ if (ncs.getConfiguration().isCloudDeployment()) {
+ ioManager.delete(fileRef.getChild(LIBRARY_ARCHIVE_NAME));
+ ioManager.delete(fileRef.getChild(DESCRIPTOR_FILE_NAME));
+ } else {
+ Path trashPath = Files.createTempDirectory(trashDirPath, null);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Drop (move) {} into {}", path, trashPath);
+ }
+ Files.move(path, trashPath, StandardCopyOption.ATOMIC_MOVE);
+ ncs.getWorkQueue().schedule(new DeleteDirectoryWork(trashPath));
}
- Files.move(path, trashPath, StandardCopyOption.ATOMIC_MOVE);
- ncs.getWorkQueue().schedule(new DeleteDirectoryWork(trashPath));
} catch (IOException e) {
throw HyracksDataException.create(e);
}
@@ -614,6 +656,10 @@
@Override
public void unzip(FileReference sourceFile, FileReference outputDir) throws IOException {
boolean logTraceEnabled = LOGGER.isTraceEnabled();
+ IIOManager localIoManager = ioManager;
+ if (ncs.getConfiguration().isCloudDeployment()) {
+ localIoManager = ((ICloudIOManager) ioManager).getLocalIOManager();
+ }
Set<Path> newDirs = new HashSet<>();
Path outputDirPath = outputDir.getFile().toPath().toAbsolutePath().normalize();
try (ZipFile zipFile = new ZipFile(sourceFile.getFile())) {
@@ -635,11 +681,11 @@
newDirs.add(p);
}
try (InputStream in = zipFile.getInputStream(entry)) {
- FileReference entryOutputFileRef = ioManager.resolveAbsolutePath(entryOutputPath.toString());
+ FileReference entryOutputFileRef = localIoManager.resolveAbsolutePath(entryOutputPath.toString());
if (logTraceEnabled) {
LOGGER.trace("Extracting file {}", entryOutputFileRef);
}
- writeAndForce(entryOutputFileRef, in, writeBuf);
+ writeAndForce(entryOutputFileRef, in, writeBuf, localIoManager);
}
}
}
@@ -649,17 +695,18 @@
}
@Override
- public void writeAndForce(FileReference outputFile, InputStream dataStream, byte[] copyBuffer) throws IOException {
+ public void writeAndForce(FileReference outputFile, InputStream dataStream, byte[] copyBuffer,
+ IIOManager localIoManager) throws IOException {
outputFile.getFile().createNewFile();
- IFileHandle fHandle = ioManager.open(outputFile, IIOManager.FileReadWriteMode.READ_WRITE,
+ IFileHandle fHandle = localIoManager.open(outputFile, IIOManager.FileReadWriteMode.READ_WRITE,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
- WritableByteChannel outChannel = ioManager.newWritableChannel(fHandle);
+ WritableByteChannel outChannel = localIoManager.newWritableChannel(fHandle);
try (OutputStream outputStream = Channels.newOutputStream(outChannel)) {
IOUtils.copyLarge(dataStream, outputStream, copyBuffer);
outputStream.flush();
- ioManager.sync(fHandle, true);
+ localIoManager.sync(fHandle, true);
} finally {
- ioManager.close(fHandle);
+ localIoManager.close(fHandle);
}
}
@@ -696,4 +743,33 @@
}
}
+ @Override
+ public void writeShim(FileReference outputFile, byte[] copyBuf) throws IOException {
+ InputStream is = getClass().getClassLoader().getResourceAsStream(outputFile.getFile().getName());
+ if (is == null) {
+ throw new IOException("Classpath does not contain necessary Python resources!");
+ }
+ try {
+ if (ncs.getConfiguration().isCloudDeployment()) {
+ writeAndForce(outputFile, is, copyBuf, ((ICloudIOManager) ioManager).getLocalIOManager());
+ } else {
+ writeAndForce(outputFile, is, copyBuf, ioManager);
+ }
+ } finally {
+ is.close();
+ }
+ }
+
+ @Override
+ public IIOManager getCloudIOManager() {
+ return ioManager;
+ }
+
+ public static void writeDescriptor(ILibraryManager libraryManager, FileReference descFile, LibraryDescriptor desc,
+ boolean cloud, byte[] copyBuf) throws IOException {
+ byte[] bytes = libraryManager.serializeLibraryDescriptor(desc);
+ libraryManager.writeAndForce(descFile, new ByteArrayInputStream(bytes), copyBuf,
+ libraryManager.getCloudIOManager());
+ }
+
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java
index b254836..e9c7d72 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java
@@ -37,6 +37,7 @@
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
@@ -61,10 +62,14 @@
protected IIOManager ioManager;
+ protected ICloudIOManager cloudIoManager;
+
protected ILibraryManager libraryManager;
private FileReference libraryDir;
+ protected boolean cloudMode = false;
+
protected AbstractLibraryNodePushable(IHyracksTaskContext ctx) {
this.ctx = ctx;
}
@@ -75,9 +80,13 @@
public final void initialize() throws HyracksDataException {
INcApplicationContext runtimeCtx =
(INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
- ioManager = runtimeCtx.getIoManager();
+ ioManager = runtimeCtx.getPersistenceIoManager();
libraryManager = runtimeCtx.getLibraryManager();
libraryDir = libraryManager.getLibraryDir(namespace, libraryName);
+ if (runtimeCtx.isCloudDeployment()) {
+ cloudMode = true;
+ cloudIoManager = (ICloudIOManager) ioManager;
+ }
try {
execute();
} catch (IOException e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
index 0c12d43..638444d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
@@ -19,22 +19,26 @@
package org.apache.asterix.external.operators;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.apache.asterix.common.library.ILibraryManager.LIBRARY_ARCHIVE_NAME;
+import static org.apache.asterix.external.library.ExternalLibraryManager.CONTENTS_DIR_NAME;
import static org.apache.asterix.external.library.ExternalLibraryManager.DESCRIPTOR_FILE_NAME;
+import static org.apache.asterix.external.library.ExternalLibraryManager.REV_1_DIR_NAME;
+import static org.apache.asterix.external.library.ExternalLibraryManager.writeDescriptor;
+import static org.apache.asterix.external.library.PythonLibraryTCPSocketEvaluator.ENTRYPOINT;
import static org.apache.hyracks.control.common.controllers.NCConfig.Option.PYTHON_USE_BUNDLED_MSGPACK;
-import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
+import java.util.Collections;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.library.LibraryDescriptor;
import org.apache.asterix.common.metadata.Namespace;
-import org.apache.asterix.external.library.ExternalLibraryManager;
import org.apache.asterix.external.util.ExternalLibraryUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -48,7 +52,7 @@
public class LibraryDeployPrepareOperatorDescriptor extends AbstractLibraryOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private static final Logger LOGGER = LogManager.getLogger(LibraryDeployPrepareOperatorDescriptor.class);
@@ -71,13 +75,30 @@
private final byte[] copyBuf = new byte[4096];
+ private void cloudDeploy() throws IOException {
+ FileReference libDir = getLibraryDir();
+ libDir = new FileReference(libDir.getDeviceHandle(), FilenameUtils.normalize(libDir.getRelativePath()));
+ cloudIoManager.downloadLibrary(Collections.singletonList(libDir));
+ FileReference content = libDir.getChild(REV_1_DIR_NAME).getChild(CONTENTS_DIR_NAME);
+ libraryManager.unzip(libDir.getChild(LIBRARY_ARCHIVE_NAME), content);
+ libraryManager.writeShim(content.getChild(ENTRYPOINT), copyBuf);
+ Files.copy(libDir.getChild(DESCRIPTOR_FILE_NAME).getFile().toPath(),
+ content.getParent().getChild(DESCRIPTOR_FILE_NAME).getFile().toPath(), REPLACE_EXISTING);
+ }
+
@Override
protected void execute() throws IOException {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Prepare deployment of library {}.{}", namespace, libraryName);
}
- // #. create library dir if necessary, clean 'stage' dir
+ if (libLocation == null && cloudMode) {
+ cloudDeploy();
+ return;
+ }
+
+ //#. create library dir if necessary, clean 'stage' dir
+
FileReference libDir = getLibraryDir();
Path libDirPath = libDir.getFile().toPath();
@@ -121,7 +142,7 @@
}
MessageDigest digest = libraryManager.download(targetFile, authToken, libLocation);
// extract from the archive
- FileReference contentsDir = stageDir.getChild(ExternalLibraryManager.CONTENTS_DIR_NAME);
+ FileReference contentsDir = stageDir.getChild(CONTENTS_DIR_NAME);
mkdir(contentsDir);
if (LOGGER.isDebugEnabled()) {
@@ -147,8 +168,9 @@
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Writing library descriptor into {}", targetDescFile);
}
- writeDescriptor(targetDescFile,
- new LibraryDescriptor(language, ExternalLibraryUtils.digestToHexString(digest)));
+ writeDescriptor(libraryManager, targetDescFile,
+ new LibraryDescriptor(language, ExternalLibraryUtils.digestToHexString(digest)), false,
+ copyBuf);
flushDirectory(contentsDir);
flushDirectory(stageDir);
@@ -158,7 +180,7 @@
boolean writeMsgpack) throws IOException {
FileReference msgpack = stageDir.getChild("msgpack.pyz");
if (writeMsgpack) {
- writeShim(msgpack);
+ libraryManager.writeShim(msgpack, copyBuf);
File msgPackFolder = new File(contentsDir.getRelativePath(), "ipc");
FileReference msgPackFolderRef =
new FileReference(contentsDir.getDeviceHandle(), msgPackFolder.getPath());
@@ -166,24 +188,7 @@
Files.delete(msgpack.getFile().toPath());
}
libraryManager.unzip(sourceFile, contentsDir);
- writeShim(contentsDir.getChild("entrypoint.py"));
- }
-
- private void writeShim(FileReference outputFile) throws IOException {
- InputStream is = getClass().getClassLoader().getResourceAsStream(outputFile.getFile().getName());
- if (is == null) {
- throw new IOException("Classpath does not contain necessary Python resources!");
- }
- try {
- libraryManager.writeAndForce(outputFile, is, copyBuf);
- } finally {
- is.close();
- }
- }
-
- private void writeDescriptor(FileReference descFile, LibraryDescriptor desc) throws IOException {
- byte[] bytes = libraryManager.serializeLibraryDescriptor(desc);
- libraryManager.writeAndForce(descFile, new ByteArrayInputStream(bytes), copyBuf);
+ libraryManager.writeShim(contentsDir.getChild(ENTRYPOINT), copyBuf);
}
};
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index 9b6a2ff..62e0829 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -19,9 +19,12 @@
package org.apache.hyracks.cloud.io;
import java.nio.ByteBuffer;
+import java.util.Collection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
import org.apache.hyracks.cloud.io.request.ICloudRequest;
import org.apache.hyracks.cloud.io.stream.CloudInputStream;
@@ -32,6 +35,8 @@
* file operations in a cloud deployment.
*/
public interface ICloudIOManager {
+ void downloadLibrary(Collection<FileReference> libPath) throws HyracksDataException;
+
/**
* Read from the cloud
*
@@ -105,4 +110,6 @@
* @param resourcePath to evict
*/
void evict(String resourcePath) throws HyracksDataException;
+
+ IIOManager getLocalIOManager();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 960f23b..37e1477 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -382,7 +382,7 @@
}
private File getWorkspaceFolder(IODeviceHandle dev) {
- return new File(dev.getMount(), dev.getWorkspace());
+ return Path.of(dev.getMount().getPath(), dev.getWorkspace()).normalize().toFile();
}
@Override
@@ -491,8 +491,12 @@
@Override
public int write(ByteBuffer src) throws IOException {
+ int origPos = src.position();
int written = IOManager.this.syncWrite(fHandle, position, src);
position += written;
+ if (src.position() < origPos + written) {
+ src.position(origPos + written);
+ }
return written;
}