[NO ISSUE][COMP][RT] New library deployment model

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Deploy external libraries using Hyracks jobs
- Library manager is no longer available on CC
- Add IServlet.init() which is invoked during web server startup
- External libraries can no longer provide implementations of
  IExternalDataSourceFactory because external libraries are
  not available on CC
- Added testcase for an external adapter

Change-Id: If64f99f6a15b81b1e426239bde63360f5ef57059
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6863
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
index 4ca3275..8c35ea3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
@@ -32,7 +32,6 @@
 import org.apache.asterix.common.exceptions.NoOpWarningCollector;
 import org.apache.asterix.common.exceptions.WarningCollector;
 import org.apache.asterix.common.exceptions.WarningUtil;
-import org.apache.asterix.common.functions.ExternalFunctionLanguage;
 import org.apache.asterix.dataflow.data.common.ExpressionTypeComputer;
 import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.formats.nontagged.ADMPrinterFactoryProvider;
@@ -364,10 +363,9 @@
         }
 
         private boolean canConstantFold(ScalarFunctionCallExpression function) throws AlgebricksException {
-            // skip external functions that are not implemented in Java
+            // skip external functions because they're not available at compile time (on CC)
             IFunctionInfo fi = function.getFunctionInfo();
-            if (fi instanceof IExternalFunctionInfo
-                    && !ExternalFunctionLanguage.JAVA.equals(((IExternalFunctionInfo) fi).getLanguage())) {
+            if (fi instanceof IExternalFunctionInfo) {
                 return false;
             }
             // skip all functions that would produce records/arrays/multisets (derived types) in their open format
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java
index bac8d66..d8e167f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java
@@ -20,6 +20,7 @@
 
 import static org.apache.asterix.api.http.server.ServletConstants.CREDENTIAL_MAP;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
@@ -50,7 +51,6 @@
 
     @Override
     public void handle(IServletRequest request, IServletResponse response) {
-
         try {
             boolean authorized = authorize(request);
             if (!authorized) {
@@ -89,7 +89,7 @@
             return false;
         }
         String providedUsername = providedCredentials[0];
-        String storedPw = storedCredentials.get(providedUsername);
+        String storedPw = getStoredCredentials(request).get(providedUsername);
         if (storedPw == null) {
             LOGGER.debug("Invalid username");
             return false;
@@ -102,4 +102,18 @@
             return false;
         }
     }
+
+    protected Map<String, String> getStoredCredentials(IServletRequest request) {
+        return storedCredentials;
+    }
+
+    public static String hashPassword(String password) {
+        return BCrypt.hashpw(password, BCrypt.gensalt(12));
+    }
+
+    public static String createAuthHeader(String user, String password) {
+        String auth = user + ":" + password;
+        byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes(StandardCharsets.ISO_8859_1));
+        return "Basic " + new String(encodedAuth);
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
index b0da8e0..d0ae6ac 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
@@ -18,60 +18,71 @@
  */
 package org.apache.asterix.api.http.server;
 
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 import static org.apache.asterix.common.functions.ExternalFunctionLanguage.JAVA;
 import static org.apache.asterix.common.functions.ExternalFunctionLanguage.PYTHON;
-import static org.apache.asterix.common.library.LibraryDescriptor.DESCRIPTOR_NAME;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.PrintWriter;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.rmi.RemoteException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.asterix.app.external.ExternalLibraryUtils;
-import org.apache.asterix.app.message.LoadUdfMessage;
+import org.apache.asterix.app.result.ResponsePrinter;
+import org.apache.asterix.app.translator.RequestParameters;
 import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.api.IReceptionist;
+import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.functions.ExternalFunctionLanguage;
-import org.apache.asterix.common.library.ILibrary;
-import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.library.LibraryDescriptor;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.asterix.common.metadata.DataverseName;
-import org.apache.asterix.common.metadata.IMetadataLockUtil;
-import org.apache.asterix.common.metadata.LockList;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.CreateLibraryStatement;
+import org.apache.asterix.lang.common.statement.LibraryDropStatement;
 import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.DatasourceAdapter;
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.Function;
-import org.apache.asterix.metadata.entities.Library;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.asterix.translator.IRequestParameters;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.NullWriter;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.deployment.DeploymentId;
-import org.apache.hyracks.api.io.IPersistedResourceRegistry;
-import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.hyracks.api.exceptions.IFormattedException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.context.ServerContext;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.utils.HttpUtil;
 import org.apache.hyracks.util.file.FileUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import com.google.common.collect.ImmutableMap;
-
-import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.codec.http.HttpScheme;
 import io.netty.handler.codec.http.multipart.FileUpload;
 import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
 import io.netty.handler.codec.http.multipart.InterfaceHttpData;
@@ -79,27 +90,68 @@
 public class UdfApiServlet extends BasicAuthServlet {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private final ICcApplicationContext appCtx;
-    private final ICCMessageBroker broker;
-    private static final String UDF_TMP_DIR_PREFIX = "udf_temp";
-    public static final int UDF_RESPONSE_TIMEOUT = 5000;
-    private Map<String, ExternalFunctionLanguage> exensionMap =
-            new ImmutableMap.Builder<String, ExternalFunctionLanguage>().put("pyz", PYTHON).put("zip", JAVA).build();
 
-    public UdfApiServlet(ICcApplicationContext appCtx, ConcurrentMap<String, Object> ctx, String... paths) {
+    private final ICcApplicationContext appCtx;
+    private final ClusterControllerService ccs;
+    private final HttpScheme httpServerProtocol;
+    private final int httpServerPort;
+
+    private final ILangCompilationProvider compilationProvider;
+    private final IStatementExecutorFactory statementExecutorFactory;
+    private final IStorageComponentProvider componentProvider;
+    private final IReceptionist receptionist;
+    private final Path workingDir;
+    private Map<String, String> sysCredentials;
+    private String sysAuthHeader;
+
+    public UdfApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx,
+            ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory,
+            IStorageComponentProvider componentProvider, HttpScheme httpServerProtocol, int httpServerPort) {
         super(ctx, paths);
         this.appCtx = appCtx;
-        this.broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        ICCServiceContext srvCtx = appCtx.getServiceContext();
+        this.ccs = (ClusterControllerService) srvCtx.getControllerService();
+        this.compilationProvider = compilationProvider;
+        this.statementExecutorFactory = statementExecutorFactory;
+        this.componentProvider = componentProvider;
+        this.receptionist = appCtx.getReceptionist();
+        this.httpServerProtocol = httpServerProtocol;
+        this.httpServerPort = httpServerPort;
+        File baseDir = srvCtx.getServerCtx().getBaseDir();
+        this.workingDir = baseDir.getAbsoluteFile().toPath().normalize().resolve(
+                Paths.get(ServerContext.APP_DIR_NAME, ExternalLibraryManager.LIBRARY_MANAGER_BASE_DIR_NAME, "tmp"));
     }
 
-    private Pair<String, DataverseName> getResource(FullHttpRequest req) throws IllegalArgumentException {
-        String[] path = new QueryStringDecoder(req.uri()).path().split("/");
-        if (path.length != 5) {
-            throw new IllegalArgumentException("Invalid resource.");
+    @Override
+    public void init() throws IOException {
+        super.init();
+        initAuth();
+        initStorage();
+    }
+
+    private void initAuth() {
+        // generate internal user
+        String sysUser;
+        do {
+            sysUser = generateRandomString(32);
+        } while (storedCredentials.containsKey(sysUser));
+        String sysPassword = generateRandomString(128);
+        this.sysCredentials = Collections.singletonMap(sysUser, hashPassword(sysPassword));
+        this.sysAuthHeader = createAuthHeader(sysUser, sysPassword);
+    }
+
+    private void initStorage() throws IOException {
+        // prepare working directory
+        if (Files.isDirectory(workingDir)) {
+            try {
+                FileUtils.cleanDirectory(workingDir.toFile());
+            } catch (IOException e) {
+                LOGGER.warn("Could not clean directory: " + workingDir, e);
+            }
+        } else {
+            Files.deleteIfExists(workingDir);
+            FileUtil.forceMkdirs(workingDir.toFile());
         }
-        String resourceName = path[path.length - 1];
-        DataverseName dataverseName = DataverseName.createFromCanonicalForm(path[path.length - 2]); // TODO: use path separators instead for multiparts
-        return new Pair<>(resourceName, dataverseName);
     }
 
     @Override
@@ -109,161 +161,67 @@
             response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
             return;
         }
-
-        PrintWriter responseWriter = response.writer();
-        FullHttpRequest req = request.getHttpRequest();
-        Pair<String, DataverseName> resourceNames;
-        try {
-            resourceNames = getResource(req);
-        } catch (IllegalArgumentException e) {
+        HttpRequest httpRequest = request.getHttpRequest();
+        Pair<DataverseName, String> libraryName = parseLibraryName(request);
+        if (libraryName == null) {
             response.setStatus(HttpResponseStatus.BAD_REQUEST);
             return;
         }
-        String resourceName = resourceNames.first;
-        DataverseName dataverse = resourceNames.second;
-        HttpPostRequestDecoder multipartDec = new HttpPostRequestDecoder(req);
-        File udfFile = null;
-        IMetadataLockUtil mdLockUtil = appCtx.getMetadataLockUtil();
-        MetadataTransactionContext mdTxnCtx = null;
-        LockList mdLockList = null;
+        Path libraryTempFile = null;
+        HttpPostRequestDecoder requestDecoder = new HttpPostRequestDecoder(httpRequest);
         try {
-            if (!multipartDec.hasNext() || multipartDec.getBodyHttpDatas().size() != 1) {
+            if (!requestDecoder.hasNext() || requestDecoder.getBodyHttpDatas().size() != 1) {
                 response.setStatus(HttpResponseStatus.BAD_REQUEST);
                 return;
             }
-            InterfaceHttpData f = multipartDec.getBodyHttpDatas().get(0);
-            if (!f.getHttpDataType().equals(InterfaceHttpData.HttpDataType.FileUpload)) {
+            InterfaceHttpData httpData = requestDecoder.getBodyHttpDatas().get(0);
+            if (!httpData.getHttpDataType().equals(InterfaceHttpData.HttpDataType.FileUpload)) {
                 response.setStatus(HttpResponseStatus.BAD_REQUEST);
                 return;
             }
-            MetadataManager.INSTANCE.init();
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            MetadataProvider metadataProvider = MetadataProvider.create(appCtx, null);
-            mdLockList = metadataProvider.getLocks();
-            mdLockUtil.createLibraryBegin(appCtx.getMetadataLockManager(), metadataProvider.getLocks(), dataverse,
-                    resourceName);
-            File workingDir = new File(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(),
-                    UDF_TMP_DIR_PREFIX);
-            if (!workingDir.exists()) {
-                FileUtil.forceMkdirs(workingDir);
-            }
-            FileUpload udf = (FileUpload) f;
-            String[] fileNameParts = udf.getFilename().split("\\.");
-            String suffix = fileNameParts[fileNameParts.length - 1];
-            ExternalFunctionLanguage libLang = exensionMap.get(suffix);
-            if (libLang == null) {
+            FileUpload fileUpload = (FileUpload) httpData;
+            String fileExt = FilenameUtils.getExtension(fileUpload.getFilename());
+            ExternalFunctionLanguage language = getLanguageByFileExtension(fileExt);
+            if (language == null) {
                 response.setStatus(HttpResponseStatus.BAD_REQUEST);
                 return;
             }
-            LibraryDescriptor desc = new LibraryDescriptor(libLang);
-            udfFile = File.createTempFile(resourceName, "." + suffix, workingDir);
-            udf.renameTo(udfFile);
-            setupBinariesAndClassloaders(dataverse, resourceName, udfFile, desc);
-            installLibrary(mdTxnCtx, dataverse, resourceName);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            response.setStatus(HttpResponseStatus.OK);
-        } catch (Exception e) {
             try {
-                ExternalLibraryUtils.deleteDeployedUdf(broker, appCtx, dataverse, resourceName);
-            } catch (Exception e2) {
-                e.addSuppressed(e2);
+                IRequestReference requestReference = receptionist.welcome(request);
+                libraryTempFile = Files.createTempFile(workingDir, "lib_", '.' + fileExt);
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("Created temporary file " + libraryTempFile + " for library " + libraryName.first + "."
+                            + libraryName.second);
+                }
+                fileUpload.renameTo(libraryTempFile.toFile());
+                URI downloadURI = createDownloadURI(libraryTempFile);
+                CreateLibraryStatement stmt = new CreateLibraryStatement(libraryName.first, libraryName.second,
+                        language, downloadURI, true, sysAuthHeader);
+                executeStatement(stmt, requestReference);
+                response.setStatus(HttpResponseStatus.OK);
+            } catch (Exception e) {
+                response.setStatus(toHttpErrorStatus(e));
+                PrintWriter responseWriter = response.writer();
+                responseWriter.write(e.getMessage());
+                responseWriter.flush();
+                LOGGER.error("Error creating/updating library " + libraryName.first + "." + libraryName.second, e);
             }
-            if (mdTxnCtx != null) {
+        } finally {
+            requestDecoder.destroy();
+            if (libraryTempFile != null) {
                 try {
-                    MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-                } catch (RemoteException r) {
-                    LOGGER.error("Unable to abort metadata transaction", r);
+                    Files.deleteIfExists(libraryTempFile);
+                } catch (IOException e) {
+                    LOGGER.warn("Could not delete temporary file " + libraryTempFile, e);
                 }
             }
-            response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
-            responseWriter.write(e.getMessage());
-            responseWriter.flush();
-            LOGGER.error(e);
-        } finally {
-            multipartDec.destroy();
-            if (udfFile != null) {
-                udfFile.delete();
-            }
-            if (mdLockList != null) {
-                mdLockList.unlock();
-            }
         }
     }
 
-    private File writeDescriptor(File folder, LibraryDescriptor desc) throws IOException {
-        IPersistedResourceRegistry reg = appCtx.getServiceContext().getPersistedResourceRegistry();
-        byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(desc.toJson(reg));
-        File descFile = new File(folder, DESCRIPTOR_NAME);
-        FileUtil.writeAndForce(Paths.get(descFile.getAbsolutePath()), bytes);
-        return descFile;
-    }
-
-    private void setupBinariesAndClassloaders(DataverseName dataverse, String resourceName, File udfFile,
-            LibraryDescriptor desc) throws Exception {
-        IHyracksClientConnection hcc = appCtx.getHcc();
-        ILibraryManager libMgr = appCtx.getLibraryManager();
-        DeploymentId udfName = new DeploymentId(ExternalLibraryUtils.makeDeploymentId(dataverse, resourceName));
-        ILibrary lib = libMgr.getLibrary(dataverse, resourceName);
-        if (lib != null) {
-            deleteUdf(dataverse, resourceName);
-        }
-        File descriptor = writeDescriptor(udfFile.getParentFile(), desc);
-        hcc.deployBinary(udfName, Arrays.asList(udfFile.getAbsolutePath(), descriptor.getAbsolutePath()), true);
-        String deployedPath =
-                FileUtil.joinPath(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(),
-                        DeploymentUtils.DEPLOYMENT, udfName.toString());
-        if (!descriptor.delete()) {
-            throw new IOException("Could not remove already uploaded library descriptor");
-        }
-        libMgr.setUpDeployedLibrary(deployedPath);
-        long reqId = broker.newRequestId();
-        List<INcAddressedMessage> requests = new ArrayList<>();
-        List<String> ncs = new ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes());
-        ncs.forEach(s -> requests.add(new LoadUdfMessage(dataverse, resourceName, reqId)));
-        broker.sendSyncRequestToNCs(reqId, ncs, requests, UDF_RESPONSE_TIMEOUT);
-    }
-
-    private static void installLibrary(MetadataTransactionContext mdTxnCtx, DataverseName dataverse, String libraryName)
-            throws RemoteException, AlgebricksException {
-        Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
-        if (dv == null) {
-            throw new AsterixException(ErrorCode.UNKNOWN_DATAVERSE);
-        }
-        Library libraryInMetadata = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, dataverse, libraryName);
-        if (libraryInMetadata != null) {
-            //replacing binary, library already exists
-            return;
-        }
-        // Add library
-        MetadataManager.INSTANCE.addLibrary(mdTxnCtx, new Library(dataverse, libraryName));
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Added library " + libraryName + " to Metadata");
-        }
-    }
-
-    private static void deleteLibrary(MetadataTransactionContext mdTxnCtx, DataverseName dataverse, String libraryName)
-            throws RemoteException, AlgebricksException {
-        Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
-        if (dv == null) {
-            throw new AsterixException(ErrorCode.UNKNOWN_DATAVERSE, dataverse);
-        }
-        Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, dataverse, libraryName);
-        if (library == null) {
-            throw new AsterixException(ErrorCode.UNKNOWN_LIBRARY, libraryName);
-        }
-        List<Function> functions = MetadataManager.INSTANCE.getDataverseFunctions(mdTxnCtx, dataverse);
-        for (Function function : functions) {
-            if (libraryName.equals(function.getLibrary())) {
-                throw new AsterixException(ErrorCode.METADATA_DROP_LIBRARY_IN_USE, libraryName);
-            }
-        }
-        List<DatasourceAdapter> adapters = MetadataManager.INSTANCE.getDataverseAdapters(mdTxnCtx, dataverse);
-        for (DatasourceAdapter adapter : adapters) {
-            if (libraryName.equals(adapter.getLibrary())) {
-                throw new AsterixException(ErrorCode.METADATA_DROP_LIBRARY_IN_USE, libraryName);
-            }
-        }
-        MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverse, libraryName);
+    private URI createDownloadURI(Path file) throws Exception {
+        String path = paths[0].substring(0, trims[0]) + '/' + file.getFileName();
+        String host = getHyracksClientConnection().getHost();
+        return new URI(httpServerProtocol.toString(), null, host, httpServerPort, path, null, null);
     }
 
     @Override
@@ -273,54 +231,139 @@
             response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
             return;
         }
-
-        Pair<String, DataverseName> resourceNames;
-        try {
-            resourceNames = getResource(request.getHttpRequest());
-        } catch (IllegalArgumentException e) {
+        Pair<DataverseName, String> libraryName = parseLibraryName(request);
+        if (libraryName == null) {
             response.setStatus(HttpResponseStatus.BAD_REQUEST);
             return;
         }
-        PrintWriter responseWriter = response.writer();
-        String resourceName = resourceNames.first;
-        DataverseName dataverse = resourceNames.second;
         try {
-            deleteUdf(dataverse, resourceName);
+            IRequestReference requestReference = receptionist.welcome(request);
+            LibraryDropStatement stmt = new LibraryDropStatement(libraryName.first, libraryName.second);
+            executeStatement(stmt, requestReference);
+            response.setStatus(HttpResponseStatus.OK);
         } catch (Exception e) {
-            response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+            response.setStatus(toHttpErrorStatus(e));
+            PrintWriter responseWriter = response.writer();
             responseWriter.write(e.getMessage());
             responseWriter.flush();
-            return;
+            LOGGER.error("Error deleting library " + libraryName.first + "." + libraryName.second, e);
         }
-        response.setStatus(HttpResponseStatus.OK);
     }
 
-    private void deleteUdf(DataverseName dataverse, String resourceName) throws Exception {
-        IMetadataLockUtil mdLockUtil = appCtx.getMetadataLockUtil();
-        MetadataTransactionContext mdTxnCtx = null;
-        LockList mdLockList = null;
+    private void executeStatement(Statement statement, IRequestReference requestReference) throws Exception {
+        SessionOutput sessionOutput = new SessionOutput(new SessionConfig(SessionConfig.OutputFormat.ADM),
+                new PrintWriter(NullWriter.NULL_WRITER));
+        ResponsePrinter printer = new ResponsePrinter(sessionOutput);
+        ResultProperties resultProperties = new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE, 1);
+        IRequestParameters requestParams = new RequestParameters(requestReference, "", null, resultProperties,
+                new IStatementExecutor.Stats(), new IStatementExecutor.StatementProperties(), null, null,
+                Collections.emptyMap(), Collections.emptyMap(), false);
+        MetadataManager.INSTANCE.init();
+        IStatementExecutor translator = statementExecutorFactory.create(appCtx, Collections.singletonList(statement),
+                sessionOutput, compilationProvider, componentProvider, printer);
+        translator.compileAndExecute(getHyracksClientConnection(), requestParams);
+    }
+
+    @Override
+    protected void get(IServletRequest request, IServletResponse response) throws Exception {
+        IClusterManagementWork.ClusterState clusterState = appCtx.getClusterStateManager().getState();
+        if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
+            response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
+            return;
+        }
+        String localPath = localPath(request);
+        while (localPath.startsWith("/")) {
+            localPath = localPath.substring(1);
+        }
+        if (localPath.isEmpty()) {
+            response.setStatus(HttpResponseStatus.BAD_REQUEST);
+            return;
+        }
+        Path filePath = workingDir.resolve(localPath).normalize();
+        if (!filePath.startsWith(workingDir)) {
+            response.setStatus(HttpResponseStatus.BAD_REQUEST);
+            return;
+        }
+        readFromFile(filePath, response);
+    }
+
+    private IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR
+        IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+        if (hcc == null) {
+            throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR);
+        }
+        return hcc;
+    }
+
+    @Override
+    protected Map<String, String> getStoredCredentials(IServletRequest request) {
+        return request.getHttpRequest().method().equals(HttpMethod.GET) ? sysCredentials
+                : super.getStoredCredentials(request);
+    }
+
+    private Pair<DataverseName, String> parseLibraryName(IServletRequest request) throws IllegalArgumentException {
+        String[] path = StringUtils.split(localPath(request), '/');
+        int ln = path.length;
+        if (ln < 2) {
+            return null;
+        }
+        String libraryName = path[ln - 1];
+        DataverseName dataverseName = DataverseName.create(Arrays.asList(path), 0, ln - 1);
+        return new Pair<>(dataverseName, libraryName);
+    }
+
+    private static ExternalFunctionLanguage getLanguageByFileExtension(String fileExtension) {
+        switch (fileExtension) {
+            case LibraryDescriptor.FILE_EXT_ZIP:
+                return JAVA;
+            case LibraryDescriptor.FILE_EXT_PYZ:
+                return PYTHON;
+            default:
+                return null;
+        }
+    }
+
+    private HttpResponseStatus toHttpErrorStatus(Exception e) {
+        if (e instanceof IFormattedException) {
+            IFormattedException fe = (IFormattedException) e;
+            if (ErrorCode.ASTERIX.equals(fe.getComponent())) {
+                switch (fe.getErrorCode()) {
+                    case ErrorCode.UNKNOWN_DATAVERSE:
+                    case ErrorCode.UNKNOWN_LIBRARY:
+                        return HttpResponseStatus.NOT_FOUND;
+                }
+            }
+        }
+        return HttpResponseStatus.INTERNAL_SERVER_ERROR;
+    }
+
+    private static String generateRandomString(int size) {
+        return RandomStringUtils.randomAlphanumeric(size);
+    }
+
+    protected void readFromFile(Path filePath, IServletResponse response) throws Exception {
+        class InputStreamGetter extends SynchronizableWork {
+            private InputStream is;
+
+            @Override
+            protected void doRun() throws Exception {
+                is = Files.newInputStream(filePath);
+            }
+        }
+
+        InputStreamGetter r = new InputStreamGetter();
+        ccs.getWorkQueue().scheduleAndSync(r);
+
+        if (r.is == null) {
+            response.setStatus(HttpResponseStatus.NOT_FOUND);
+            return;
+        }
         try {
-            MetadataManager.INSTANCE.init();
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            MetadataProvider metadataProvider = MetadataProvider.create(appCtx, null);
-            mdLockList = metadataProvider.getLocks();
-            mdLockUtil.dropLibraryBegin(appCtx.getMetadataLockManager(), metadataProvider.getLocks(), dataverse,
-                    resourceName);
-            deleteLibrary(mdTxnCtx, dataverse, resourceName);
-            ExternalLibraryUtils.deleteDeployedUdf(broker, appCtx, dataverse, resourceName);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            try {
-                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-            } catch (RemoteException r) {
-                LOGGER.error("Unable to abort metadata transaction", r);
-            }
-            LOGGER.error(e);
-            throw e;
+            response.setStatus(HttpResponseStatus.OK);
+            HttpUtil.setContentType(response, "application/octet-stream");
+            IOUtils.copyLarge(r.is, response.outputStream());
         } finally {
-            if (mdLockList != null) {
-                mdLockList.unlock();
-            }
+            r.is.close();
         }
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtil.java
new file mode 100644
index 0000000..dd1b736
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtil.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.functions.ExternalFunctionLanguage;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.external.operators.LibraryDeployAbortOperatorDescriptor;
+import org.apache.asterix.external.operators.LibraryDeployCommitOperatorDescriptor;
+import org.apache.asterix.external.operators.LibraryDeployPrepareOperatorDescriptor;
+import org.apache.asterix.external.operators.LibraryUndeployOperatorDescriptor;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class ExternalLibraryUtil {
+
+    private ExternalLibraryUtil() {
+    }
+
+    public static Triple<JobSpecification, JobSpecification, JobSpecification> buildCreateLibraryJobSpec(
+            DataverseName dataverseName, String libraryName, ExternalFunctionLanguage language, URI downloadURI,
+            String authToken, MetadataProvider metadataProvider) {
+
+        ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = getSplitsAndConstraints(appCtx);
+
+        JobSpecification prepareJobSpec = createLibraryPrepareJobSpec(dataverseName, libraryName, language, downloadURI,
+                authToken, appCtx, splitsAndConstraint);
+
+        JobSpecification commitJobSpec =
+                createLibraryCommitJobSpec(dataverseName, libraryName, appCtx, splitsAndConstraint);
+
+        JobSpecification abortJobSpec =
+                createLibraryAbortJobSpec(dataverseName, libraryName, appCtx, splitsAndConstraint);
+
+        return new Triple<>(prepareJobSpec, commitJobSpec, abortJobSpec);
+    }
+
+    private static JobSpecification createLibraryPrepareJobSpec(DataverseName dataverseName, String libraryName,
+            ExternalFunctionLanguage language, URI downloadURI, String authToken, ICcApplicationContext appCtx,
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint) {
+        JobSpecification jobSpec = RuntimeUtils.createJobSpecification(appCtx);
+        IOperatorDescriptor opDesc = new LibraryDeployPrepareOperatorDescriptor(jobSpec, dataverseName, libraryName,
+                language, downloadURI, authToken);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc,
+                splitsAndConstraint.second);
+        jobSpec.addRoot(opDesc);
+        return jobSpec;
+    }
+
+    private static JobSpecification createLibraryCommitJobSpec(DataverseName dataverseName, String libraryName,
+            ICcApplicationContext appCtx, Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint) {
+        JobSpecification jobSpec = RuntimeUtils.createJobSpecification(appCtx);
+        IOperatorDescriptor opDesc = new LibraryDeployCommitOperatorDescriptor(jobSpec, dataverseName, libraryName);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc,
+                splitsAndConstraint.second);
+        return jobSpec;
+    }
+
+    private static JobSpecification createLibraryAbortJobSpec(DataverseName dataverseName, String libraryName,
+            ICcApplicationContext appCtx, Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint) {
+        JobSpecification jobSpec = RuntimeUtils.createJobSpecification(appCtx);
+        IOperatorDescriptor opDesc = new LibraryDeployAbortOperatorDescriptor(jobSpec, dataverseName, libraryName);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc,
+                splitsAndConstraint.second);
+        return jobSpec;
+    }
+
+    public static JobSpecification buildDropLibraryJobSpec(DataverseName dataverseName, String libraryName,
+            MetadataProvider metadataProvider) {
+        ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = getSplitsAndConstraints(appCtx);
+
+        JobSpecification jobSpec = RuntimeUtils.createJobSpecification(appCtx);
+        IOperatorDescriptor opDesc = new LibraryUndeployOperatorDescriptor(jobSpec, dataverseName, libraryName);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc,
+                splitsAndConstraint.second);
+        jobSpec.addRoot(opDesc);
+
+        return jobSpec;
+    }
+
+    private static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitsAndConstraints(
+            ICcApplicationContext appCtx) {
+        FileSplit[] splits = getSplits(appCtx.getClusterStateManager());
+        return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+    }
+
+    private static FileSplit[] getSplits(IClusterStateManager clusterStateManager) {
+        ClusterPartition[] clusterPartitions = clusterStateManager.getClusterPartitons();
+        Arrays.sort(clusterPartitions,
+                Comparator.comparing(ClusterPartition::getNodeId).thenComparingInt(ClusterPartition::getIODeviceNum));
+        List<FileSplit> splits = new ArrayList<>();
+        for (ClusterPartition partition : clusterPartitions) {
+            String activeNodeId = partition.getActiveNodeId();
+            int n = splits.size();
+            if (n > 0 && splits.get(n - 1).getNodeName().equals(activeNodeId)) {
+                continue;
+            }
+            FileSplit split = StoragePathUtil.getFileSplitForClusterPartition(partition, ".");
+            splits.add(split);
+        }
+        return splits.toArray(new FileSplit[0]);
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
deleted file mode 100755
index e5d874d..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import static org.apache.asterix.api.http.server.UdfApiServlet.UDF_RESPONSE_TIMEOUT;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.asterix.app.message.DeleteUdfMessage;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.asterix.common.metadata.DataverseName;
-import org.apache.hyracks.api.deployment.DeploymentId;
-
-public class ExternalLibraryUtils {
-
-    private ExternalLibraryUtils() {
-    }
-
-    public static void deleteDeployedUdf(ICCMessageBroker broker, ICcApplicationContext appCtx,
-            DataverseName dataverseName, String lib) throws Exception {
-        long reqId = broker.newRequestId();
-        List<INcAddressedMessage> requests = new ArrayList<>();
-        List<String> ncs = new ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes());
-        ncs.forEach(s -> requests.add(new DeleteUdfMessage(dataverseName, lib, reqId)));
-        broker.sendSyncRequestToNCs(reqId, ncs, requests, UDF_RESPONSE_TIMEOUT);
-        appCtx.getLibraryManager().deregister(dataverseName, lib);
-        appCtx.getHcc().unDeployBinary(new DeploymentId(makeDeploymentId(dataverseName, lib)));
-    }
-
-    public static String makeDeploymentId(DataverseName dv, String resourceName) {
-        List<String> dvParts = dv.getParts();
-        dvParts.add(resourceName);
-        DataverseName dvWithLibrarySuffix = DataverseName.create(dvParts);
-        return dvWithLibrarySuffix.getCanonicalForm();
-    }
-}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractUdfMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractUdfMessage.java
deleted file mode 100644
index b8c77e7..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractUdfMessage.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.message;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.messaging.CcIdentifiedMessage;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.asterix.common.metadata.DataverseName;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public abstract class AbstractUdfMessage extends CcIdentifiedMessage implements INcAddressedMessage {
-
-    protected final DataverseName dataverseName;
-    protected final String libraryName;
-    protected static final Logger LOGGER = LogManager.getLogger();
-
-    private static final long serialVersionUID = 3L;
-
-    private final long reqId;
-
-    public AbstractUdfMessage(DataverseName dataverseName, String libraryName, long reqId) {
-        this.dataverseName = dataverseName;
-        this.libraryName = libraryName;
-        this.reqId = reqId;
-    }
-
-    @Override
-    public void handle(INcApplicationContext appCtx) {
-        ILibraryManager mgr = appCtx.getLibraryManager();
-        INCMessageBroker broker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker();
-        try {
-            handleAction(mgr, appCtx);
-            broker.sendMessageToCC(getCcId(), new UdfResponseMessage(reqId, null));
-        } catch (Exception e) {
-            try {
-                LOGGER.error("Error in UDF distribution", e);
-                broker.sendMessageToPrimaryCC(new UdfResponseMessage(reqId, e));
-            } catch (Exception f) {
-                LOGGER.error("Unable to send failure response to CC", f);
-            }
-        }
-
-    }
-
-    protected abstract void handleAction(ILibraryManager mgr, INcApplicationContext appCtx) throws Exception;
-
-}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java
deleted file mode 100644
index 890f1fe..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.message;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.metadata.DataverseName;
-
-public class DeleteUdfMessage extends AbstractUdfMessage {
-
-    private static final long serialVersionUID = 3L;
-
-    public DeleteUdfMessage(DataverseName dataverseName, String libraryName, long reqId) {
-        super(dataverseName, libraryName, reqId);
-    }
-
-    @Override
-    protected void handleAction(ILibraryManager mgr, INcApplicationContext appCtx) {
-        mgr.deregister(dataverseName, libraryName);
-    }
-}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java
deleted file mode 100644
index f2d9174..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.message;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.metadata.DataverseName;
-import org.apache.hyracks.control.common.deployment.DeploymentUtils;
-import org.apache.hyracks.util.file.FileUtil;
-
-public class LoadUdfMessage extends AbstractUdfMessage {
-
-    private static final long serialVersionUID = 2L;
-
-    public LoadUdfMessage(DataverseName dataverseName, String libraryName, long reqId) {
-        super(dataverseName, libraryName, reqId);
-    }
-
-    @Override
-    protected void handleAction(ILibraryManager mgr, INcApplicationContext appCtx) throws Exception {
-        appCtx.getLibraryManager().setUpDeployedLibrary(
-                FileUtil.joinPath(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(),
-                        DeploymentUtils.DEPLOYMENT, dataverseName.getCanonicalForm() + "." + libraryName));
-    }
-}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UdfResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UdfResponseMessage.java
deleted file mode 100644
index 29a91c5..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UdfResponseMessage.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.message;
-
-import java.util.ArrayList;
-
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
-import org.apache.asterix.common.messaging.api.INcResponse;
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class UdfResponseMessage implements ICcAddressedMessage, INcResponse {
-
-    private static final long serialVersionUID = -4520773141458281271L;
-    private final long reqId;
-    private final Exception failure;
-
-    public UdfResponseMessage(long reqId, Exception failure) {
-        this.reqId = reqId;
-        this.failure = failure;
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void setResult(MutablePair<ICCMessageBroker.ResponseState, Object> result) {
-        ICCMessageBroker.ResponseState responseState = result.getLeft();
-        if (failure != null) {
-            result.setLeft(ICCMessageBroker.ResponseState.FAILURE);
-            result.setRight(failure);
-            return;
-        }
-        switch (responseState) {
-            case UNINITIALIZED:
-                // First to arrive
-                result.setRight(new ArrayList<String>());
-                // No failure, change state to success
-                result.setLeft(ICCMessageBroker.ResponseState.SUCCESS);
-                // Fallthrough
-            case SUCCESS:
-                break;
-            default:
-                break;
-
-        }
-    }
-
-    @Override
-    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
-        ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
-        broker.respond(reqId, this);
-    }
-}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index e9da651..5c15e68 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -80,6 +80,7 @@
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IPersistedResourceRegistry;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
@@ -142,7 +143,7 @@
     private ActiveManager activeManager;
     private IReplicationChannel replicationChannel;
     private IReplicationManager replicationManager;
-    private final ILibraryManager libraryManager;
+    private ExternalLibraryManager libraryManager;
     private final NCExtensionManager ncExtensionManager;
     private final IStorageComponentProvider componentProvider;
     private final IPersistedResourceRegistry persistedResourceRegistry;
@@ -170,8 +171,6 @@
         componentProvider = new StorageComponentProvider();
         resourceIdFactory = new GlobalResourceIdFactoryProvider(ncServiceContext).createResourceIdFactory();
         persistedResourceRegistry = ncServiceContext.getPersistedResourceRegistry();
-        libraryManager =
-                new ExternalLibraryManager(ncServiceContext.getServerCtx().getAppDir(), persistedResourceRegistry);
         cacheManager = new CacheManager();
     }
 
@@ -198,7 +197,8 @@
         txnSubsystem = new TransactionSubsystem(this, recoveryManagerFactory);
         IRecoveryManager recoveryMgr = txnSubsystem.getRecoveryManager();
         SystemState systemState = recoveryMgr.getSystemState();
-        if (initialRun || systemState == SystemState.PERMANENT_DATA_LOSS) {
+        boolean resetStorageData = initialRun || systemState == SystemState.PERMANENT_DATA_LOSS;
+        if (resetStorageData) {
             //delete any storage data before the resource factory is initialized
             if (LOGGER.isWarnEnabled()) {
                 LOGGER.log(Level.WARN,
@@ -253,6 +253,12 @@
                     storageProperties.getBufferCacheMaxOpenFiles(), ioQueueLen, getServiceContext().getThreadFactory());
         }
 
+        NodeControllerService ncs = (NodeControllerService) getServiceContext().getControllerService();
+        FileReference appDir =
+                ioManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getAbsolutePath());
+        libraryManager = new ExternalLibraryManager(ncs, persistedResourceRegistry, appDir);
+        libraryManager.initStorage(resetStorageData);
+
         /*
          * The order of registration is important. The buffer cache must registered before recovery and transaction
          * managers. Notes: registered components are stopped in reversed order
@@ -280,6 +286,7 @@
         lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
         lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
         lccm.register(txnSubsystem.getCheckpointManager());
+        lccm.register(libraryManager);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
deleted file mode 100644
index 1656ce5..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.nc.task;
-
-import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.hyracks.api.control.CcId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.service.IControllerService;
-
-public class ExternalLibrarySetupTask implements INCLifecycleTask {
-
-    private static final long serialVersionUID = 1L;
-    private final boolean metadataNode;
-
-    public ExternalLibrarySetupTask(boolean metadataNode) {
-        this.metadataNode = metadataNode;
-    }
-
-    @Override
-    public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
-        INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
-        try {
-            appContext.getLibraryManager().scanLibraries(cs.getContext().getServerCtx().getAppDir());
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "{ \"class\" : \"" + getClass().getSimpleName() + "\", \"metadata-node\" : " + metadataNode + " }";
-    }
-}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 0d86cb9..7576e0d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -29,7 +29,6 @@
 import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
 import org.apache.asterix.app.nc.task.CheckpointTask;
 import org.apache.asterix.app.nc.task.ExportMetadataNodeTask;
-import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
 import org.apache.asterix.app.nc.task.LocalRecoveryTask;
 import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
 import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
@@ -202,7 +201,6 @@
         if (metadataNode) {
             tasks.add(new MetadataBootstrapTask(clusterManager.getMetadataPartition().getPartitionId()));
         }
-        tasks.add(new ExternalLibrarySetupTask(metadataNode));
         tasks.add(new CheckpointTask());
         tasks.add(new StartLifecycleComponentsTask());
         if (metadataNode) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d1c5aeb..a7df432 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -50,7 +50,7 @@
 import org.apache.asterix.app.active.ActiveEntityEventsListener;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.active.FeedEventsListener;
-import org.apache.asterix.app.external.ExternalLibraryUtils;
+import org.apache.asterix.app.external.ExternalLibraryUtil;
 import org.apache.asterix.app.result.ExecutionError;
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
@@ -81,7 +81,6 @@
 import org.apache.asterix.common.external.IDataSourceAdapter;
 import org.apache.asterix.common.functions.ExternalFunctionLanguage;
 import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.metadata.IMetadataLockUtil;
 import org.apache.asterix.common.utils.JobUtils;
@@ -110,6 +109,7 @@
 import org.apache.asterix.lang.common.statement.CreateFeedStatement;
 import org.apache.asterix.lang.common.statement.CreateFunctionStatement;
 import org.apache.asterix.lang.common.statement.CreateIndexStatement;
+import org.apache.asterix.lang.common.statement.CreateLibraryStatement;
 import org.apache.asterix.lang.common.statement.CreateSynonymStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
@@ -125,6 +125,7 @@
 import org.apache.asterix.lang.common.statement.IndexDropStatement;
 import org.apache.asterix.lang.common.statement.InsertStatement;
 import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
+import org.apache.asterix.lang.common.statement.LibraryDropStatement;
 import org.apache.asterix.lang.common.statement.LoadStatement;
 import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
 import org.apache.asterix.lang.common.statement.NodegroupDecl;
@@ -373,6 +374,12 @@
                     case FUNCTION_DROP:
                         handleFunctionDropStatement(metadataProvider, stmt);
                         break;
+                    case CREATE_LIBRARY:
+                        handleCreateLibraryStatement(metadataProvider, stmt, hcc);
+                        break;
+                    case LIBRARY_DROP:
+                        handleLibraryDropStatement(metadataProvider, stmt, hcc);
+                        break;
                     case CREATE_SYNONYM:
                         handleCreateSynonymStatement(metadataProvider, stmt);
                         break;
@@ -1437,8 +1444,6 @@
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         List<JobSpecification> jobsToExecute = new ArrayList<>();
-        List<Library> librariesToDelete = new ArrayList<>();
-        ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
         try {
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
             if (dv == null) {
@@ -1512,9 +1517,16 @@
                     ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(dataset);
                 }
             }
+
+            // #. prepare jobs which will drop corresponding libraries.
+            List<Library> libraries = MetadataManager.INSTANCE.getDataverseLibraries(mdTxnCtx, dataverseName);
+            for (Library library : libraries) {
+                jobsToExecute.add(ExternalLibraryUtil.buildDropLibraryJobSpec(dataverseName, library.getName(),
+                        metadataProvider));
+            }
+
             jobsToExecute.add(DataverseUtil.dropDataverseJobSpec(dv, metadataProvider));
 
-            librariesToDelete = MetadataManager.INSTANCE.getDataverseLibraries(mdTxnCtx, dataverseName);
             // #. mark PendingDropOp on the dataverse record by
             // first, deleting the dataverse record from the DATAVERSE_DATASET
             // second, inserting the dataverse record with the PendingDropOp value into the
@@ -1531,10 +1543,6 @@
                 runJob(hcc, jobSpec);
             }
 
-            for (Library lib : librariesToDelete) {
-                ExternalLibraryUtils.deleteDeployedUdf(broker, appCtx, dataverseName, lib.getName());
-            }
-
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             bActiveTxn = true;
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1567,15 +1575,11 @@
                 }
 
                 // #. execute compensation operations
-                // remove the all indexes in NC
+                // remove the all artifacts in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
                         runJob(hcc, jobSpec);
                     }
-
-                    for (Library lib : librariesToDelete) {
-                        ExternalLibraryUtils.deleteDeployedUdf(broker, appCtx, dataverseName, lib.getName());
-                    }
                 } catch (Exception e2) {
                     // do no throw exception since still the metadata needs to be compensated.
                     e.addSuppressed(e2);
@@ -2213,6 +2217,205 @@
         }
     }
 
+    protected void handleCreateLibraryStatement(MetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
+        CreateLibraryStatement cls = (CreateLibraryStatement) stmt;
+        DataverseName dataverseName = getActiveDataverseName(cls.getDataverseName());
+        String libraryName = cls.getLibraryName();
+        lockUtil.createLibraryBegin(lockManager, metadataProvider.getLocks(), dataverseName, libraryName);
+        try {
+            doCreateLibrary(metadataProvider, dataverseName, libraryName, cls, hcc);
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+    }
+
+    private void doCreateLibrary(MetadataProvider metadataProvider, DataverseName dataverseName, String libraryName,
+            CreateLibraryStatement cls, IHyracksClientConnection hcc) throws Exception {
+        JobUtils.ProgressState progress = ProgressState.NO_PROGRESS;
+        boolean prepareJobSuccessful = false;
+        JobSpecification abortJobSpec = null;
+        Library existingLibrary = null;
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        try {
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            if (dv == null) {
+                throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, dataverseName);
+            }
+            ExternalFunctionLanguage language = cls.getLang();
+            existingLibrary = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, dataverseName, libraryName);
+            if (existingLibrary != null && !cls.getReplaceIfExists()) {
+                throw new CompilationException(ErrorCode.COMPILATION_ERROR,
+                        "A library with this name " + libraryName + " already exists.");
+            }
+
+            // #. add/update library with PendingAddOp
+            Library libraryPendingAdd =
+                    new Library(dataverseName, libraryName, language.name(), MetadataUtil.PENDING_ADD_OP);
+            if (existingLibrary == null) {
+                MetadataManager.INSTANCE.addLibrary(mdTxnCtx, libraryPendingAdd);
+            } else {
+                MetadataManager.INSTANCE.updateLibrary(mdTxnCtx, libraryPendingAdd);
+            }
+
+            // #. prepare to create library artifacts in NC.
+            Triple<JobSpecification, JobSpecification, JobSpecification> jobSpecs =
+                    ExternalLibraryUtil.buildCreateLibraryJobSpec(dataverseName, libraryName, language,
+                            cls.getLocation(), cls.getAuthToken(), metadataProvider);
+            JobSpecification prepareJobSpec = jobSpecs.first;
+            JobSpecification commitJobSpec = jobSpecs.second;
+            abortJobSpec = jobSpecs.third;
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+            progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
+
+            // #. create library artifacts in NCs.
+            runJob(hcc, prepareJobSpec, jobFlags);
+            prepareJobSuccessful = true;
+            runJob(hcc, commitJobSpec, jobFlags);
+
+            // #. begin new metadataTxn
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            bActiveTxn = true;
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+            Library newLibrary = new Library(dataverseName, libraryName, language.name(), MetadataUtil.PENDING_NO_OP);
+            MetadataManager.INSTANCE.updateLibrary(mdTxnCtx, newLibrary);
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                abort(e, e, mdTxnCtx);
+            }
+            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+                boolean undoFailure = false;
+                if (!prepareJobSuccessful) {
+                    // 'prepare' job failed -> try running 'abort' job
+                    try {
+                        runJob(hcc, abortJobSpec, jobFlags);
+                    } catch (Exception e2) {
+                        e.addSuppressed(e2);
+                        undoFailure = true;
+                    }
+                } else if (existingLibrary == null) {
+                    // 'commit' job failed for a new library -> try removing the library
+                    try {
+                        JobSpecification dropLibraryJobSpec = ExternalLibraryUtil.buildDropLibraryJobSpec(dataverseName,
+                                libraryName, metadataProvider);
+                        runJob(hcc, dropLibraryJobSpec, jobFlags);
+                    } catch (Exception e2) {
+                        e.addSuppressed(e2);
+                        undoFailure = true;
+                    }
+                } else {
+                    // 'commit' job failed for an existing library -> bad state
+                    undoFailure = true;
+                }
+
+                // revert/remove the record from the metadata.
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                try {
+                    if (existingLibrary == null) {
+                        MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName);
+                    } else {
+                        MetadataManager.INSTANCE.updateLibrary(mdTxnCtx, existingLibrary);
+                    }
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    abort(e, e2, mdTxnCtx);
+                    throw new IllegalStateException("System is inconsistent state: pending library(" + libraryName
+                            + ") couldn't be reverted/removed from the metadata", e);
+                }
+
+                if (undoFailure) {
+                    throw new IllegalStateException(
+                            "System is inconsistent state: library(" + libraryName + ") couldn't be deployed", e);
+                }
+            }
+            throw e;
+        }
+    }
+
+    protected void handleLibraryDropStatement(MetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws Exception {
+        LibraryDropStatement stmtDropLibrary = (LibraryDropStatement) stmt;
+        DataverseName dataverseName = getActiveDataverseName(stmtDropLibrary.getDataverseName());
+        String libraryName = stmtDropLibrary.getLibraryName();
+        lockUtil.dropLibraryBegin(lockManager, metadataProvider.getLocks(), dataverseName, libraryName);
+        try {
+            doDropLibrary(metadataProvider, dataverseName, libraryName, hcc);
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+    }
+
+    private void doDropLibrary(MetadataProvider metadataProvider, DataverseName dataverseName, String libraryName,
+            IHyracksClientConnection hcc) throws Exception {
+        JobUtils.ProgressState progress = ProgressState.NO_PROGRESS;
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        try {
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            if (dv == null) {
+                throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, dataverseName);
+            }
+            Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, dataverseName, libraryName);
+            if (library == null) {
+                throw new CompilationException(ErrorCode.UNKNOWN_LIBRARY, libraryName);
+            }
+
+            // #. mark the existing library as PendingDropOp
+            // do drop instead of update because drop will fail if the library is used by functions/adapters
+            MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName);
+            MetadataManager.INSTANCE.addLibrary(mdTxnCtx,
+                    new Library(dataverseName, libraryName, library.getLanguage(), MetadataUtil.PENDING_DROP_OP));
+
+            // #. drop library artifacts in NCs.
+            JobSpecification jobSpec =
+                    ExternalLibraryUtil.buildDropLibraryJobSpec(dataverseName, libraryName, metadataProvider);
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+            progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
+
+            // #. drop library artifacts in NCs.
+            runJob(hcc, jobSpec, jobFlags);
+
+            // #. begin new metadataTxn
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            bActiveTxn = true;
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+            // #. drop library
+            MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName);
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                abort(e, e, mdTxnCtx);
+            }
+            if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+                // remove the record from the metadata.
+                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                try {
+                    MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                } catch (Exception e2) {
+                    e.addSuppressed(e2);
+                    abort(e, e2, mdTxnCtx);
+                    throw new IllegalStateException("System is inconsistent state: pending library(" + libraryName
+                            + ") couldn't be removed from the metadata", e);
+                }
+            }
+            throw e;
+        }
+    }
+
     protected void handleCreateSynonymStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
         CreateSynonymStatement css = (CreateSynonymStatement) stmt;
         DataverseName dataverseName = getActiveDataverseName(css.getDataverseName());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index ee5b957..3eeed59 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -79,7 +79,6 @@
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
 import org.apache.asterix.common.utils.Servlets;
 import org.apache.asterix.external.adapter.factory.AdapterFactoryService;
-import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.messaging.CCMessageBroker;
 import org.apache.asterix.metadata.MetadataManager;
@@ -159,9 +158,7 @@
         componentProvider = new StorageComponentProvider();
         ccExtensionManager = new CCExtensionManager(new ArrayList<>(getExtensions()));
         IGlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager();
-        ILibraryManager libraryManager = new ExternalLibraryManager(ccServiceCtx.getServerCtx().getAppDir(),
-                ccServiceCtx.getPersistedResourceRegistry());
-        appCtx = createApplicationContext(libraryManager, globalRecoveryManager, lifecycleCoordinator,
+        appCtx = createApplicationContext(null, globalRecoveryManager, lifecycleCoordinator,
                 () -> new Receptionist("CC"), ConfigValidator::new, ccExtensionManager, new AdapterFactoryService());
         final CCConfig ccConfig = controllerService.getCCConfig();
         if (System.getProperty("java.rmi.server.hostname") == null) {
@@ -212,10 +209,10 @@
             IReceptionistFactory receptionistFactory, IConfigValidatorFactory configValidatorFactory,
             CCExtensionManager ccExtensionManager, IAdapterFactoryService adapterFactoryService)
             throws AlgebricksException, IOException {
-        return new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE,
-                globalRecoveryManager, lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider,
-                new MetadataLockManager(), createMetadataLockUtil(), receptionistFactory, configValidatorFactory,
-                ccExtensionManager, adapterFactoryService);
+        return new CcApplicationContext(ccServiceCtx, getHcc(), () -> MetadataManager.INSTANCE, globalRecoveryManager,
+                lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider, new MetadataLockManager(),
+                createMetadataLockUtil(), receptionistFactory, configValidatorFactory, ccExtensionManager,
+                adapterFactoryService);
     }
 
     protected IGlobalRecoveryManager createGlobalRecoveryManager() throws Exception {
@@ -300,7 +297,7 @@
     }
 
     protected void addServlet(HttpServer server, String path) {
-        server.addServlet(createServlet(server.ctx(), path, path));
+        server.addServlet(createServlet(server, path, path));
     }
 
     protected HttpServer setupQueryWebServer(ExternalProperties externalProperties) throws Exception {
@@ -315,7 +312,8 @@
         return queryWebServer;
     }
 
-    protected IServlet createServlet(ConcurrentMap<String, Object> ctx, String key, String... paths) {
+    protected IServlet createServlet(HttpServer server, String key, String... paths) {
+        ConcurrentMap<String, Object> ctx = server.ctx();
         switch (key) {
             case Servlets.RUNNING_REQUESTS:
                 return new CcQueryCancellationServlet(ctx, appCtx, paths);
@@ -349,9 +347,11 @@
             case Servlets.ACTIVE_STATS:
                 return new ActiveStatsApiServlet(appCtx, ctx, paths);
             case Servlets.UDF:
-                return new UdfApiServlet(appCtx, ctx, paths);
+                return new UdfApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP),
+                        getStatementExecutorFactory(), componentProvider, server.getScheme(),
+                        server.getAddress().getPort());
             default:
-                throw new IllegalStateException(String.valueOf(key));
+                throw new IllegalStateException(key);
         }
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 7093660..8ea0ad4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -267,8 +267,9 @@
                 (FeedIntakeOperatorDescriptor) intakeJob.getOperatorMap().get(new OperatorDescriptorId(0));
         FeedIntakeOperatorDescriptor ingestionOp;
         if (firstOp.getAdaptorFactory() == null) {
-            ingestionOp = new FeedIntakeOperatorDescriptor(jobSpec, feed, firstOp.getAdaptorLibraryName(),
-                    firstOp.getAdaptorFactoryClassName(), firstOp.getAdapterOutputType(), firstOp.getPolicyAccessor(),
+            ingestionOp = new FeedIntakeOperatorDescriptor(jobSpec, feed, firstOp.getAdaptorLibraryDataverse(),
+                    firstOp.getAdaptorLibraryName(), firstOp.getAdaptorFactoryClassName(),
+                    firstOp.getAdapterOutputType(), firstOp.getPolicyAccessor(),
                     firstOp.getOutputRecordDescriptors()[0]);
         } else {
             ingestionOp = new FeedIntakeOperatorDescriptor(jobSpec, feed, firstOp.getAdaptorFactory(),
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 117fb63..86d8ed4 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -199,7 +199,6 @@
         List<ILibraryManager> libraryManagers = new ArrayList<>();
         init(deleteOldInstanceData, confDir);
         if (externalLibPath != null && externalLibPath.length() != 0) {
-            libraryManagers.add(((ICcApplicationContext) cc.getApplicationContext()).getLibraryManager());
             for (NodeControllerService nc : ncs) {
                 INcApplicationContext runtimeCtx = (INcApplicationContext) nc.getApplicationContext();
                 libraryManagers.add(runtimeCtx.getLibraryManager());
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index 30ebc1d..2328bf4 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -24,7 +24,6 @@
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.IdentitiyResolverFactory;
@@ -92,8 +91,6 @@
         FailedGroup.setName("failed");
 
         List<ILibraryManager> libraryManagers = new ArrayList<>();
-        // Adds the library manager for CC.
-        libraryManagers.add(((ICcApplicationContext) integrationUtil.cc.getApplicationContext()).getLibraryManager());
         // Adds library managers for NCs, one-per-NC.
         for (NodeControllerService nc : integrationUtil.ncs) {
             INcApplicationContext runtimeCtx = (INcApplicationContext) nc.getApplicationContext();
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.0.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.0.ddl.sqlpp
new file mode 100644
index 0000000..617df1f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.0.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Feed with an external adapter
+ * Expected Res : Success
+ */
+
+drop dataverse externallibtest if exists;
+create dataverse externallibtest;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.1.lib.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.1.lib.sqlpp
new file mode 100644
index 0000000..253c657
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.1.lib.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Feed with an external adapter
+ * Expected Res : Success
+ */
+
+install externallibtest testlib admin admin target/data/externallib/asterix-external-data-testlib.zip
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.2.ddl.sqlpp
new file mode 100644
index 0000000..9cb4046
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.2.ddl.sqlpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Feed with an external adapter
+ * Expected Res : Success
+ */
+
+use externallibtest;
+
+create adapter TweetAdapter language java as "testlib",
+  "org.apache.asterix.external.library.adapter.TestTypedAdapterFactory";
+
+create type TweetType as open {
+    tweetid: int64
+};
+
+create dataset Tweets(TweetType) primary key tweetid;
+
+create feed TweetFeed with {
+  "adapter-name": "TweetAdapter",
+  "type-name" : "TweetType",
+  "num_output_records": 4
+};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.3.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.3.update.sqlpp
new file mode 100644
index 0000000..c23a37d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.3.update.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Feed with an external adapter
+ * Expected Res : Success
+ */
+
+use externallibtest;
+
+set `wait-for-completion-feed` "true";
+
+connect feed TweetFeed to dataset Tweets;
+start feed TweetFeed;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.4.query.sqlpp
new file mode 100644
index 0000000..07a093a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.4.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Feed with an external adapter
+ * Expected Res : Success
+ */
+
+use externallibtest;
+
+select value t
+from Tweets t
+order by t.tweetid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/deterministic/deterministic.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/deterministic/deterministic.4.adm
index 18dce53..9b1b82f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/deterministic/deterministic.4.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/deterministic/deterministic.4.adm
@@ -2,7 +2,7 @@
 -- DISTRIBUTE_RESULT  |UNPARTITIONED|
   exchange
   -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-    assign [$$1] <- [{"default": getCapital_default("United States"), "deterministic": { country: "United States", capital: "Washington D.C." }, "not_deterministic": getCapital_not_deterministic("United States")}]
+    assign [$$1] <- [{"default": getCapital_default("United States"), "deterministic": getCapital_deterministic("United States"), "not_deterministic": getCapital_not_deterministic("United States")}]
     -- ASSIGN  |UNPARTITIONED|
       empty-tuple-source
       -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-external-adapter/feed-with-external-adapter.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-external-adapter/feed-with-external-adapter.4.adm
new file mode 100644
index 0000000..9c810dd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-external-adapter/feed-with-external-adapter.4.adm
@@ -0,0 +1,4 @@
+{ "tweetid": 1, "message-text": "1" }
+{ "tweetid": 2, "message-text": "2" }
+{ "tweetid": 3, "message-text": "3" }
+{ "tweetid": 4, "message-text": "4" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml
index cd67e42..156d5a0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml
@@ -46,7 +46,7 @@
     <test-case FilePath="external-library">
       <compilation-unit name="mysum_dropinuse">
         <output-dir compare="Text">mysum_dropinuse</output-dir>
-        <expected-error>Library testlib is being used. It cannot be dropped</expected-error>
+        <expected-error>Cannot drop library externallibtest.testlib being used by funciton externallibtest.mysum@2</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-library">
@@ -102,5 +102,10 @@
         <output-dir compare="Text">feed-with-external-function</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="feeds">
+      <compilation-unit name="feed-with-external-adapter">
+        <output-dir compare="Text">feed-with-external-adapter</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
index 0340898..67f8253 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
@@ -28,7 +28,6 @@
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
-import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -56,11 +55,6 @@
     NodeProperties getNodeProperties();
 
     /**
-     * @return the library manager which implements {@link org.apache.asterix.common.library.ILibraryManager}
-     */
-    ILibraryManager getLibraryManager();
-
-    /**
      * @return the service context
      */
     IServiceContext getServiceContext();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index eed186c..65b587b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -23,6 +23,7 @@
 import java.util.concurrent.Executor;
 
 import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
@@ -134,4 +135,9 @@
      * @return the cache manager
      */
     ICacheManager getCacheManager();
+
+    /**
+     * @return the library manager
+     */
+    ILibraryManager getLibraryManager();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index fdba8d6..9cf6f9d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -324,7 +324,6 @@
     public static final int FAILED_TO_PARSE_METADATA = 3115;
     public static final int INPUT_DECODE_FAILURE = 3116;
     public static final int FAILED_TO_PARSE_MALFORMED_LOG_RECORD = 3117;
-    public static final int METADATA_DROP_LIBRARY_IN_USE = 3118;
 
     // Lifecycle management errors
     public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
index 6d2df37..65b415d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
@@ -23,7 +23,7 @@
 
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -76,6 +76,6 @@
      * @throws AlgebricksException
      * @throws HyracksDataException
      */
-    void configure(IServiceContext serviceContext, Map<String, String> configuration,
+    void configure(ICCServiceContext serviceContext, Map<String, String> configuration,
             IWarningCollector warningCollector) throws HyracksDataException, AlgebricksException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibrary.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibrary.java
index 164c215..9b60bf1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibrary.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibrary.java
@@ -21,10 +21,11 @@
 package org.apache.asterix.common.library;
 
 import org.apache.asterix.common.functions.ExternalFunctionLanguage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface ILibrary {
 
     ExternalFunctionLanguage getLanguage();
 
-    void close();
+    void close() throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
index fdaaaeb..83f1d71 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
@@ -19,20 +19,21 @@
 
 package org.apache.asterix.common.library;
 
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 
 public interface ILibraryManager {
 
-    void setUpDeployedLibrary(String path) throws IOException, AsterixException;
+    ILibrary getLibrary(DataverseName dataverseName, String libraryName) throws HyracksDataException;
 
-    void scanLibraries(File appDir);
+    void closeLibrary(DataverseName dataverseName, String libraryName) throws HyracksDataException;
 
-    void deregister(DataverseName dv, String name);
+    // deployment helpers
 
-    ILibrary getLibrary(DataverseName dvName, String libraryName);
+    FileReference getLibraryDir(DataverseName dataverseName, String libraryName) throws HyracksDataException;
 
+    void dropLibraryPath(FileReference fileRef) throws HyracksDataException;
+
+    byte[] serializeLibraryDescriptor(LibraryDescriptor libraryDescriptor) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/LibraryDescriptor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/LibraryDescriptor.java
index 9dd2a3d..72bde09 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/LibraryDescriptor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/LibraryDescriptor.java
@@ -24,7 +24,6 @@
 import org.apache.hyracks.api.io.IPersistedResourceRegistry;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
@@ -33,29 +32,35 @@
 public class LibraryDescriptor implements IJsonSerializable {
 
     private static final long serialVersionUID = 1L;
-    public static final String DESCRIPTOR_NAME = "descriptor.json";
-    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+
+    private static final String FIELD_LANGUAGE = "lang";
+
+    public static final String FILE_EXT_ZIP = "zip";
+
+    public static final String FILE_EXT_PYZ = "pyz";
+
     /**
      * The library's language
      */
     private final ExternalFunctionLanguage lang;
 
-    public LibraryDescriptor(ExternalFunctionLanguage lang) {
-        this.lang = lang;
+    public LibraryDescriptor(ExternalFunctionLanguage language) {
+        this.lang = language;
     }
 
-    public ExternalFunctionLanguage getLang() {
+    public ExternalFunctionLanguage getLanguage() {
         return lang;
     }
 
     public JsonNode toJson(IPersistedResourceRegistry registry) {
         ObjectNode jsonNode = registry.getClassIdentifier(LibraryDescriptor.class, serialVersionUID);
-        jsonNode.put("lang", lang.name());
+        jsonNode.put(FIELD_LANGUAGE, lang.name());
         return jsonNode;
     }
 
     public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
-        final ExternalFunctionLanguage lang = ExternalFunctionLanguage.valueOf(json.get("lang").asText());
+        String langText = json.get(FIELD_LANGUAGE).asText();
+        ExternalFunctionLanguage lang = ExternalFunctionLanguage.valueOf(langText);
         return new LibraryDescriptor(lang);
     }
 }
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 9bbebf2..1cf653f 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -322,7 +322,6 @@
 3115 = Failed to parse record metadata
 3116 = Failed to decode input
 3117 = Failed to parse record, malformed log record
-3118 = Library %1$s is being used. It cannot be dropped
 
 # Lifecycle management errors
 4000 = Partition id %1$s for node %2$s already in use by node %3$s
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/ExternalAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/ExternalAdapterFactory.java
new file mode 100644
index 0000000..45c4b12
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/ExternalAdapterFactory.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.adapter.factory;
+
+import java.util.Map;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IDataSourceAdapter;
+import org.apache.asterix.common.functions.ExternalFunctionLanguage;
+import org.apache.asterix.common.library.ILibrary;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
+import org.apache.asterix.external.library.JavaLibrary;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public final class ExternalAdapterFactory implements ITypedAdapterFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final DataverseName libraryDataverse;
+
+    private final String libraryName;
+
+    private final String className;
+
+    private ARecordType outputType;
+
+    private ARecordType metaType;
+
+    private Map<String, String> configuration;
+
+    private transient ICCServiceContext serviceContext;
+
+    public ExternalAdapterFactory(DataverseName libraryDataverse, String libraryName, String className) {
+        this.libraryDataverse = libraryDataverse;
+        this.libraryName = libraryName;
+        this.className = className;
+    }
+
+    @Override
+    public void configure(ICCServiceContext serviceContext, Map<String, String> configuration,
+            IWarningCollector warningCollector) {
+        this.serviceContext = serviceContext;
+        this.configuration = configuration;
+    }
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
+        //TODO:needs to be specified in the adapter configuration
+        ICcApplicationContext appCtx = (ICcApplicationContext) serviceContext.getApplicationContext();
+        return IExternalDataSourceFactory.getPartitionConstraints(appCtx, null, 1);
+    }
+
+    @Override
+    public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+        INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
+        INcApplicationContext appCtx = (INcApplicationContext) serviceCtx.getApplicationContext();
+        ILibraryManager libraryManager = appCtx.getLibraryManager();
+        ILibrary library = libraryManager.getLibrary(libraryDataverse, libraryName);
+        if (ExternalFunctionLanguage.JAVA != library.getLanguage()) {
+            throw new HyracksDataException("Unexpected library language: " + library.getLanguage());
+        }
+        ClassLoader cl = ((JavaLibrary) library).getClassLoader();
+        try {
+            ITypedAdapterFactory adapterFactory = (ITypedAdapterFactory) cl.loadClass(className).newInstance();
+            adapterFactory.setOutputType(outputType);
+            adapterFactory.setMetaType(metaType);
+            adapterFactory.configure(null, configuration, ctx.getWarningCollector());
+            return adapterFactory.createAdapter(ctx, partition);
+        } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | AlgebricksException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void setOutputType(ARecordType outputType) {
+        this.outputType = outputType;
+    }
+
+    @Override
+    public ARecordType getOutputType() {
+        return outputType;
+    }
+
+    @Override
+    public void setMetaType(ARecordType metaType) {
+        this.metaType = metaType;
+    }
+
+    @Override
+    public ARecordType getMetaType() {
+        return metaType;
+    }
+
+    @Override
+    public String getAlias() {
+        return "external:" + className;
+    }
+
+    public DataverseName getLibraryDataverse() {
+        return libraryDataverse;
+    }
+
+    public String getLibraryName() {
+        return libraryName;
+    }
+
+    public String getClassName() {
+        return className;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index badf105..897f020 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -22,7 +22,6 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -51,6 +50,7 @@
 import org.apache.asterix.om.utils.RecordUtil;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -124,7 +124,7 @@
     private void restoreExternalObjects(IServiceContext serviceContext, ILibraryManager libraryManager,
             IWarningCollector warningCollector) throws HyracksDataException, AlgebricksException {
         if (dataSourceFactory == null) {
-            dataSourceFactory = createExternalDataSourceFactory(configuration, libraryManager);
+            dataSourceFactory = createExternalDataSourceFactory(configuration);
             // create and configure parser factory
             if (dataSourceFactory.isIndexible() && (files != null)) {
                 ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
@@ -133,7 +133,7 @@
         }
         if (dataParserFactory == null) {
             // create and configure parser factory
-            dataParserFactory = createDataParserFactory(configuration, libraryManager);
+            dataParserFactory = createDataParserFactory(configuration);
             dataParserFactory.setRecordType(recordType);
             dataParserFactory.setMetaType(metaType);
             dataParserFactory.configure(configuration);
@@ -141,18 +141,18 @@
     }
 
     @Override
-    public void configure(IServiceContext serviceContext, Map<String, String> configuration,
+    public void configure(ICCServiceContext serviceContext, Map<String, String> configuration,
             IWarningCollector warningCollector) throws HyracksDataException, AlgebricksException {
         this.configuration = configuration;
-        IApplicationContext appCtx = (IApplicationContext) serviceContext.getApplicationContext();
+        ICcApplicationContext appCtx = (ICcApplicationContext) serviceContext.getApplicationContext();
         ExternalDataUtils.validateDataSourceParameters(configuration);
-        dataSourceFactory = createExternalDataSourceFactory(configuration, appCtx.getLibraryManager());
+        dataSourceFactory = createExternalDataSourceFactory(configuration);
         if (dataSourceFactory.isIndexible() && (files != null)) {
             ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
         }
         dataSourceFactory.configure(serviceContext, configuration, warningCollector);
         ExternalDataUtils.validateDataParserParameters(configuration);
-        dataParserFactory = createDataParserFactory(configuration, appCtx.getLibraryManager());
+        dataParserFactory = createDataParserFactory(configuration);
         dataParserFactory.setRecordType(recordType);
         dataParserFactory.setMetaType(metaType);
         dataParserFactory.configure(configuration);
@@ -161,12 +161,12 @@
         nullifyExternalObjects();
     }
 
-    private void configureFeedLogManager(IApplicationContext appCtx) throws HyracksDataException, AlgebricksException {
+    private void configureFeedLogManager(ICcApplicationContext appCtx)
+            throws HyracksDataException, AlgebricksException {
         this.isFeed = ExternalDataUtils.isFeed(configuration);
         if (isFeed) {
-            feedLogFileSplits = FeedUtils.splitsForAdapter((ICcApplicationContext) appCtx,
-                    ExternalDataUtils.getDataverse(configuration), ExternalDataUtils.getFeedName(configuration),
-                    dataSourceFactory.getPartitionConstraint());
+            feedLogFileSplits = FeedUtils.splitsForAdapter(appCtx, ExternalDataUtils.getDataverse(configuration),
+                    ExternalDataUtils.getFeedName(configuration), dataSourceFactory.getPartitionConstraint());
         }
     }
 
@@ -224,13 +224,12 @@
         configuration = Collections.emptyMap();
     }
 
-    protected IExternalDataSourceFactory createExternalDataSourceFactory(Map<String, String> configuration,
-            ILibraryManager libraryManager) throws HyracksDataException, AsterixException {
-        return DatasourceFactoryProvider.getExternalDataSourceFactory(libraryManager, configuration);
+    protected IExternalDataSourceFactory createExternalDataSourceFactory(Map<String, String> configuration)
+            throws HyracksDataException, AsterixException {
+        return DatasourceFactoryProvider.getExternalDataSourceFactory(configuration);
     }
 
-    protected IDataParserFactory createDataParserFactory(Map<String, String> configuration,
-            ILibraryManager libraryManager) throws AsterixException {
-        return ParserFactoryProvider.getDataParserFactory(libraryManager, configuration);
+    protected IDataParserFactory createDataParserFactory(Map<String, String> configuration) throws AsterixException {
+        return ParserFactoryProvider.getDataParserFactory(configuration);
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
index b543804..fc0c4f9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
@@ -21,7 +21,6 @@
 import java.io.Serializable;
 import java.util.Map;
 
-import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.external.api.ILookupReaderFactory;
 import org.apache.asterix.external.api.ILookupRecordReader;
 import org.apache.asterix.external.api.IRecordDataParser;
@@ -34,7 +33,7 @@
 import org.apache.asterix.external.provider.ParserFactoryProvider;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
@@ -79,17 +78,14 @@
         }
     }
 
-    public void configure(IServiceContext serviceContext, Map<String, String> configuration,
+    public void configure(ICCServiceContext serviceContext, Map<String, String> configuration,
             IWarningCollector warningCollector) throws HyracksDataException, AlgebricksException {
         this.configuration = configuration;
-        IApplicationContext appCtx = (IApplicationContext) serviceContext.getApplicationContext();
         readerFactory =
                 LookupReaderFactoryProvider.getLookupReaderFactory(serviceContext, configuration, warningCollector);
-        dataParserFactory = (IRecordDataParserFactory<T>) ParserFactoryProvider
-                .getDataParserFactory(appCtx.getLibraryManager(), configuration);
+        dataParserFactory = (IRecordDataParserFactory<T>) ParserFactoryProvider.getDataParserFactory(configuration);
         dataParserFactory.setRecordType(recordType);
         readerFactory.configure(serviceContext, configuration, warningCollector);
         dataParserFactory.configure(configuration);
     }
-
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index 2533af5..b58e604 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -53,6 +53,8 @@
 
     @Override
     public void close() throws IOException {
-        feedLogManager.close();
+        if (feedLogManager != null) {
+            feedLogManager.close();
+        }
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
index b09752e..b0a4dfdc 100755
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
@@ -20,114 +20,292 @@
 
 import static com.fasterxml.jackson.databind.MapperFeature.SORT_PROPERTIES_ALPHABETICALLY;
 import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS;
-import static org.apache.asterix.common.library.LibraryDescriptor.DESCRIPTOR_NAME;
 
-import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.file.Files;
-import java.util.Collections;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.ExternalFunctionLanguage;
 import org.apache.asterix.common.library.ILibrary;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.library.LibraryDescriptor;
 import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IPersistedResourceRegistry;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.util.file.FileUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 
-public class ExternalLibraryManager implements ILibraryManager {
+public final class ExternalLibraryManager implements ILibraryManager, ILifeCycleComponent {
 
-    protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    public static final String LIBRARY_MANAGER_BASE_DIR_NAME = "library";
 
-    static {
-        OBJECT_MAPPER.enable(SerializationFeature.INDENT_OUTPUT);
-        OBJECT_MAPPER.configure(SORT_PROPERTIES_ALPHABETICALLY, true);
-        OBJECT_MAPPER.configure(ORDER_MAP_ENTRIES_BY_KEYS, true);
+    private static final String STORAGE_DIR_NAME = "storage";
+
+    private static final String TRASH_DIR_NAME = "trash";
+
+    public static final String REV_0_DIR_NAME = "rev_0";
+
+    public static final String REV_1_DIR_NAME = "rev_1";
+
+    public static final String STAGE_DIR_NAME = "stage";
+
+    public static final String CONTENTS_DIR_NAME = "contents";
+
+    public static final String DESCRIPTOR_FILE_NAME = "lib.json";
+
+    private static final Logger LOGGER = LogManager.getLogger(ExternalLibraryManager.class);
+
+    private final NodeControllerService ncs;
+    private final IPersistedResourceRegistry reg;
+    private final ObjectMapper objectMapper;
+    private final FileReference baseDir;
+    private final FileReference storageDir;
+    private final Path storageDirPath;
+    private final FileReference trashDir;
+    private final Path trashDirPath;
+    private final Map<Pair<DataverseName, String>, ILibrary> libraries = new HashMap<>();
+
+    public ExternalLibraryManager(NodeControllerService ncs, IPersistedResourceRegistry reg, FileReference appDir) {
+        this.ncs = ncs;
+        this.reg = reg;
+        baseDir = appDir.getChild(LIBRARY_MANAGER_BASE_DIR_NAME);
+        storageDir = baseDir.getChild(STORAGE_DIR_NAME);
+        storageDirPath = storageDir.getFile().toPath();
+        trashDir = baseDir.getChild(TRASH_DIR_NAME);
+        trashDirPath = trashDir.getFile().toPath().normalize();
+        objectMapper = createObjectMapper();
     }
 
-    private static final Logger LOGGER = LogManager.getLogger();
-    private final Map<Pair<DataverseName, String>, ILibrary> libraries = Collections.synchronizedMap(new HashMap<>());
-    private final IPersistedResourceRegistry reg;
-
-    public ExternalLibraryManager(File appDir, IPersistedResourceRegistry reg) {
-        this.reg = reg;
-        scanLibraries(appDir);
+    public void initStorage(boolean resetStorageData) throws HyracksDataException {
+        try {
+            Path baseDirPath = baseDir.getFile().toPath();
+            if (Files.isDirectory(baseDirPath)) {
+                if (resetStorageData) {
+                    FileUtils.cleanDirectory(baseDir.getFile());
+                    Files.createDirectory(storageDirPath);
+                    Files.createDirectory(trashDirPath);
+                    IoUtil.flushDirectory(baseDirPath);
+                } else {
+                    boolean createdDirs = false;
+                    if (!Files.isDirectory(storageDirPath)) {
+                        Files.deleteIfExists(storageDirPath);
+                        Files.createDirectory(storageDirPath);
+                        createdDirs = true;
+                    }
+                    if (Files.isDirectory(trashDirPath)) {
+                        FileUtils.cleanDirectory(trashDir.getFile());
+                    } else {
+                        Files.deleteIfExists(trashDirPath);
+                        Files.createDirectory(trashDirPath);
+                        createdDirs = true;
+                    }
+                    //TODO:clean all rev_0 if their rev_1 exist
+                    if (createdDirs) {
+                        IoUtil.flushDirectory(baseDirPath);
+                    }
+                }
+            } else {
+                FileUtil.forceMkdirs(baseDir.getFile());
+                Files.createDirectory(storageDirPath);
+                Files.createDirectory(trashDirPath);
+                // flush app dir's parent because we might've created app dir there
+                IoUtil.flushDirectory(baseDirPath.getParent().getParent());
+                // flush app dir (base dir's parent) because we might've created base dir there
+                IoUtil.flushDirectory(baseDirPath.getParent());
+                // flush base dir because we created storage/trash dirs there
+                IoUtil.flushDirectory(baseDirPath);
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
     }
 
     @Override
-    public void scanLibraries(File appDir) {
-        File[] libs = appDir.listFiles((dir, name) -> dir.isDirectory());
-        if (libs != null) {
-            for (File lib : libs) {
-                String libraryPath = lib.getAbsolutePath();
+    public void start() {
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream ouputStream) {
+        synchronized (this) {
+            for (Map.Entry<Pair<DataverseName, String>, ILibrary> p : libraries.entrySet()) {
+                ILibrary library = p.getValue();
                 try {
-                    setUpDeployedLibrary(libraryPath);
-                } catch (AsterixException | IOException e) {
-                    LOGGER.error("Unable to properly initialize external libraries", e);
+                    library.close();
+                } catch (HyracksDataException e) {
+                    LOGGER.warn("Error closing library " + p.getKey().first + "." + p.getKey().second, e);
                 }
             }
         }
     }
 
-    public void register(DataverseName dataverseName, String libraryName, ILibrary library) {
-        Pair<DataverseName, String> key = getKey(dataverseName, libraryName);
-        libraries.put(key, library);
+    private FileReference getDataverseDir(DataverseName dataverseName) throws HyracksDataException {
+        return getChildFileRef(storageDir, dataverseName.getCanonicalForm());
     }
 
     @Override
-    public void deregister(DataverseName dataverseName, String libraryName) {
+    public FileReference getLibraryDir(DataverseName dataverseName, String libraryName) throws HyracksDataException {
+        FileReference dataverseDir = getDataverseDir(dataverseName);
+        return getChildFileRef(dataverseDir, libraryName);
+    }
+
+    @Override
+    public ILibrary getLibrary(DataverseName dataverseName, String libraryName) throws HyracksDataException {
         Pair<DataverseName, String> key = getKey(dataverseName, libraryName);
-        ILibrary cl = libraries.remove(key);
-        if (cl != null) {
-            cl.close();
+        synchronized (this) {
+            ILibrary library = libraries.get(key);
+            if (library == null) {
+                library = loadLibrary(dataverseName, libraryName);
+                libraries.put(key, library);
+            }
+            return library;
         }
     }
 
-    @Override
-    public void setUpDeployedLibrary(String libraryPath) throws IOException, AsterixException {
-        // get the installed library dirs
-        String[] parts = libraryPath.split(File.separator);
-        DataverseName catenatedDv = DataverseName.createFromCanonicalForm(parts[parts.length - 1]);
-        String name = catenatedDv.getParts().get(catenatedDv.getParts().size() - 1);
-        DataverseName dataverse = DataverseName.create(catenatedDv.getParts(), 0, catenatedDv.getParts().size() - 1);
+    private ILibrary loadLibrary(DataverseName dataverseName, String libraryName) throws HyracksDataException {
+        FileReference libRevDir = findLibraryRevDir(dataverseName, libraryName);
+        if (libRevDir == null) {
+            throw new HyracksDataException("Cannot find library: " + dataverseName + '.' + libraryName);
+        }
+        FileReference libContentsDir = libRevDir.getChild(CONTENTS_DIR_NAME);
+        if (!libContentsDir.getFile().isDirectory()) {
+            throw new HyracksDataException("Cannot find library: " + dataverseName + '.' + libraryName);
+        }
         try {
-            File langFile = new File(libraryPath, DESCRIPTOR_NAME);
-            final JsonNode jsonNode = OBJECT_MAPPER.readValue(Files.readAllBytes(langFile.toPath()), JsonNode.class);
-            LibraryDescriptor desc = (LibraryDescriptor) reg.deserialize(jsonNode);
-            ExternalFunctionLanguage libLang = desc.getLang();
+            FileReference descFile = libRevDir.getChild(DESCRIPTOR_FILE_NAME);
+            byte[] descData = Files.readAllBytes(descFile.getFile().toPath());
+            LibraryDescriptor desc = deserializeLibraryDescriptor(descData);
+            ExternalFunctionLanguage libLang = desc.getLanguage();
             switch (libLang) {
                 case JAVA:
-                    register(dataverse, name, new JavaLibrary(libraryPath));
-                    break;
+                    return new JavaLibrary(libContentsDir.getFile());
                 case PYTHON:
-                    register(dataverse, name, new PythonLibrary(libraryPath));
-                    break;
+                    return new PythonLibrary(libContentsDir.getFile());
                 default:
-                    throw new IllegalStateException("Library path file refers to unknown language");
+                    throw new HyracksDataException("Invalid language: " + libraryName);
             }
-        } catch (IOException | AsterixException e) {
-            LOGGER.error("Failed to initialized library", e);
-            throw e;
+        } catch (IOException e) {
+            LOGGER.error("Failed to initialize library " + dataverseName + '.' + libraryName, e);
+            throw HyracksDataException.create(e);
         }
     }
 
     @Override
-    public ILibrary getLibrary(DataverseName dataverseName, String libraryName) {
+    public byte[] serializeLibraryDescriptor(LibraryDescriptor libraryDescriptor) throws HyracksDataException {
+        try {
+            return objectMapper.writeValueAsBytes(libraryDescriptor.toJson(reg));
+        } catch (JsonProcessingException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private LibraryDescriptor deserializeLibraryDescriptor(byte[] data) throws IOException {
+        JsonNode jsonNode = objectMapper.readValue(data, JsonNode.class);
+        return (LibraryDescriptor) reg.deserialize(jsonNode);
+    }
+
+    private FileReference findLibraryRevDir(DataverseName dataverseName, String libraryName)
+            throws HyracksDataException {
+        FileReference libraryBaseDir = getLibraryDir(dataverseName, libraryName);
+        if (!libraryBaseDir.getFile().isDirectory()) {
+            return null;
+        }
+        FileReference libDirRev1 = libraryBaseDir.getChild(REV_1_DIR_NAME);
+        if (libDirRev1.getFile().isDirectory()) {
+            return libDirRev1;
+        }
+        FileReference libDirRev0 = libraryBaseDir.getChild(REV_0_DIR_NAME);
+        if (libDirRev0.getFile().isDirectory()) {
+            return libDirRev0;
+        }
+        return null;
+    }
+
+    @Override
+    public void closeLibrary(DataverseName dataverseName, String libraryName) throws HyracksDataException {
         Pair<DataverseName, String> key = getKey(dataverseName, libraryName);
-        return libraries.get(key);
+        ILibrary library;
+        synchronized (this) {
+            library = libraries.remove(key);
+        }
+        if (library != null) {
+            library.close();
+        }
+    }
+
+    @Override
+    public void dumpState(OutputStream os) {
     }
 
     private static Pair<DataverseName, String> getKey(DataverseName dataverseName, String libraryName) {
         return new Pair<>(dataverseName, libraryName);
     }
 
+    @Override
+    public void dropLibraryPath(FileReference fileRef) throws HyracksDataException {
+        // does not flush any directories
+        try {
+            Path path = fileRef.getFile().toPath();
+            Path trashPath = Files.createTempDirectory(trashDirPath, null);
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Drop (move) {} into {}", path, trashPath);
+            }
+            Files.move(path, trashPath, StandardCopyOption.ATOMIC_MOVE);
+            ncs.getWorkQueue().schedule(new DeleteDirectoryWork(trashPath));
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private FileReference getChildFileRef(FileReference dir, String fileName) throws HyracksDataException {
+        Path dirPath = dir.getFile().toPath().toAbsolutePath().normalize();
+        FileReference fileRef = dir.getChild(fileName);
+        Path filePath = fileRef.getFile().toPath().toAbsolutePath().normalize();
+        if (!filePath.startsWith(dirPath)) {
+            throw new HyracksDataException("Invalid file name: " + fileName);
+        }
+        return fileRef;
+    }
+
+    private static ObjectMapper createObjectMapper() {
+        ObjectMapper om = new ObjectMapper();
+        om.enable(SerializationFeature.INDENT_OUTPUT);
+        om.configure(SORT_PROPERTIES_ALPHABETICALLY, true);
+        om.configure(ORDER_MAP_ENTRIES_BY_KEYS, true);
+        return om;
+    }
+
+    private static final class DeleteDirectoryWork extends AbstractWork {
+
+        private final Path path;
+
+        private DeleteDirectoryWork(Path path) {
+            this.path = path;
+        }
+
+        @Override
+        public void run() {
+            try {
+                IoUtil.delete(path);
+            } catch (HyracksDataException e) {
+                LOGGER.warn("Error deleting " + path);
+            }
+        }
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluator.java
index ce23cac..3ff706d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluator.java
@@ -19,7 +19,7 @@
 
 package org.apache.asterix.external.library;
 
-import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.om.functions.IExternalFunctionInfo;
 import org.apache.asterix.om.types.IAType;
@@ -44,6 +44,6 @@
             argEvals[i] = args[i].createScalarEvaluator(context);
         }
         libraryManager =
-                ((IApplicationContext) context.getServiceContext().getApplicationContext()).getLibraryManager();
+                ((INcApplicationContext) context.getServiceContext().getApplicationContext()).getLibraryManager();
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaLibrary.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaLibrary.java
index c7e7c09..0ef651d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaLibrary.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaLibrary.java
@@ -26,9 +26,9 @@
 import java.net.MalformedURLException;
 import java.net.URL;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.ExternalFunctionLanguage;
 import org.apache.asterix.common.library.ILibrary;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -38,22 +38,17 @@
 
     private static final Logger LOGGER = LogManager.getLogger();
 
-    public JavaLibrary(String libraryPath) throws AsterixException, MalformedURLException {
-
-        File installDir = new File(libraryPath);
-
-        // get a reference to the specific library dir
-        File libDir = installDir;
+    public JavaLibrary(File libDir) throws HyracksDataException, MalformedURLException {
 
         FilenameFilter jarFileFilter = (dir, name) -> name.endsWith(".jar");
 
         // Get the jar file <Allow only a single jar file>
         String[] jarsInLibDir = libDir.list(jarFileFilter);
         if (jarsInLibDir.length > 1) {
-            throw new AsterixException("Incorrect library structure: found multiple library jars");
+            throw new HyracksDataException("Incorrect library structure: found multiple library jars");
         }
         if (jarsInLibDir.length <= 0) {
-            throw new AsterixException("Incorrect library structure: could not find library jar");
+            throw new HyracksDataException("Incorrect library structure: could not find library jar");
         }
 
         File libJar = new File(libDir, jarsInLibDir[0]);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibrary.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibrary.java
index 3ce3e91..8d388fd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibrary.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibrary.java
@@ -29,8 +29,8 @@
 
     private final File path;
 
-    PythonLibrary(String path) {
-        this.path = new File(path);
+    PythonLibrary(File libDir) {
+        this.path = libDir;
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java
new file mode 100644
index 0000000..5927145
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.operators;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+abstract class AbstractLibraryOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    protected final DataverseName dataverseName;
+
+    protected final String libraryName;
+
+    public AbstractLibraryOperatorDescriptor(IOperatorDescriptorRegistry spec, DataverseName dataverseName,
+            String libraryName) {
+        super(spec, 0, 0);
+        this.dataverseName = dataverseName;
+        this.libraryName = libraryName;
+    }
+
+    protected abstract class AbstractLibraryNodePushable extends AbstractOperatorNodePushable {
+
+        protected final IHyracksTaskContext ctx;
+
+        protected IIOManager ioManager;
+
+        protected ILibraryManager libraryManager;
+
+        private FileReference libraryDir;
+
+        protected AbstractLibraryNodePushable(IHyracksTaskContext ctx) {
+            this.ctx = ctx;
+        }
+
+        protected abstract void execute() throws IOException;
+
+        @Override
+        public final void initialize() throws HyracksDataException {
+            INcApplicationContext runtimeCtx =
+                    (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
+            ioManager = runtimeCtx.getIoManager();
+            libraryManager = runtimeCtx.getLibraryManager();
+            libraryDir = libraryManager.getLibraryDir(dataverseName, libraryName);
+            try {
+                execute();
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+
+        protected FileReference getLibraryDir() {
+            return libraryDir;
+        }
+
+        protected FileReference getRev0Dir() {
+            return libraryDir.getChild(ExternalLibraryManager.REV_0_DIR_NAME);
+        }
+
+        protected FileReference getRev1Dir() {
+            return libraryDir.getChild(ExternalLibraryManager.REV_1_DIR_NAME);
+        }
+
+        protected FileReference getStageDir() {
+            return libraryDir.getChild(ExternalLibraryManager.STAGE_DIR_NAME);
+        }
+
+        // does not flush any directories
+        protected void dropIfExists(FileReference fileRef) throws HyracksDataException {
+            if (fileRef.getFile().exists()) {
+                libraryManager.dropLibraryPath(fileRef);
+            }
+        }
+
+        protected void move(FileReference src, FileReference dest) throws IOException {
+            dropIfExists(dest);
+            Files.move(src.getFile().toPath(), dest.getFile().toPath(), StandardCopyOption.ATOMIC_MOVE);
+        }
+
+        protected void mkdir(FileReference dir) throws IOException {
+            Files.createDirectory(dir.getFile().toPath());
+        }
+
+        protected void flushDirectory(FileReference dir) throws IOException {
+            flushDirectory(dir.getFile());
+        }
+
+        protected void flushDirectory(File dir) throws IOException {
+            IoUtil.flushDirectory(dir);
+        }
+
+        protected void flushDirectory(Path dir) throws IOException {
+            IoUtil.flushDirectory(dir);
+        }
+
+        protected void closeLibrary() throws HyracksDataException {
+            libraryManager.closeLibrary(dataverseName, libraryName);
+        }
+
+        @Override
+        public final void deinitialize() {
+        }
+
+        @Override
+        public final int getInputArity() {
+            return 0;
+        }
+
+        @Override
+        public final void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        }
+
+        @Override
+        public IFrameWriter getInputFrameWriter(int index) {
+            return null;
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index 569e5a1..27ae55c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.common.functions.ExternalFunctionLanguage;
 import org.apache.asterix.common.library.ILibrary;
 import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.feed.api.IFeed;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
@@ -50,7 +51,7 @@
 
     private static final String FEED_EXTENSION_NAME = "Feed";
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     private static final Logger LOGGER = LogManager.getLogger();
 
@@ -62,6 +63,7 @@
     /** The adaptor factory that is used to create an instance of the feed adaptor **/
     private ITypedAdapterFactory adaptorFactory;
     /** The library that contains the adapter in use. **/
+    private DataverseName adaptorLibraryDataverse;
     private String adaptorLibraryName;
     /**
      * The adapter factory class that is used to create an instance of the feed adapter.
@@ -81,12 +83,13 @@
         this.outRecDescs[0] = rDesc;
     }
 
-    public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed feed, String adapterLibraryName,
-            String adapterFactoryClassName, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor,
-            RecordDescriptor rDesc) {
+    public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed feed, DataverseName adapterLibraryDataverse,
+            String adapterLibraryName, String adapterFactoryClassName, ARecordType adapterOutputType,
+            FeedPolicyAccessor policyAccessor, RecordDescriptor rDesc) {
         super(spec, 0, 1);
         this.feedId = new EntityId(FEED_EXTENSION_NAME, feed.getDataverseName(), feed.getFeedName());
         this.adaptorFactoryClassName = adapterFactoryClassName;
+        this.adaptorLibraryDataverse = adapterLibraryDataverse;
         this.adaptorLibraryName = adapterLibraryName;
         this.adaptorConfiguration = feed.getConfiguration();
         this.adapterOutputType = adapterOutputType;
@@ -108,8 +111,7 @@
         INcApplicationContext runtimeCtx =
                 (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
         ILibraryManager libraryManager = runtimeCtx.getLibraryManager();
-
-        ILibrary lib = libraryManager.getLibrary(feedId.getDataverseName(), adaptorLibraryName);
+        ILibrary lib = libraryManager.getLibrary(adaptorLibraryDataverse, adaptorLibraryName);
         if (lib.getLanguage() != ExternalFunctionLanguage.JAVA) {
             throw new HyracksDataException("Unexpected library language: " + lib.getLanguage());
         }
@@ -118,8 +120,7 @@
             try {
                 adapterFactory = (ITypedAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance());
                 adapterFactory.setOutputType(adapterOutputType);
-                adapterFactory.configure(ctx.getJobletContext().getServiceContext(), adaptorConfiguration,
-                        ctx.getWarningCollector());
+                adapterFactory.configure(null, adaptorConfiguration, ctx.getWarningCollector());
             } catch (Exception e) {
                 throw HyracksDataException.create(e);
             }
@@ -153,6 +154,10 @@
         return this.policyAccessor;
     }
 
+    public DataverseName getAdaptorLibraryDataverse() {
+        return adaptorLibraryDataverse;
+    }
+
     public String getAdaptorLibraryName() {
         return this.adaptorLibraryName;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployAbortOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployAbortOperatorDescriptor.java
new file mode 100644
index 0000000..6908361
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployAbortOperatorDescriptor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.operators;
+
+import java.io.IOException;
+
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class LibraryDeployAbortOperatorDescriptor extends AbstractLibraryOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = LogManager.getLogger(LibraryDeployAbortOperatorDescriptor.class);
+
+    public LibraryDeployAbortOperatorDescriptor(IOperatorDescriptorRegistry spec, DataverseName dataverseName,
+            String libraryName) {
+        super(spec, dataverseName, libraryName);
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new AbstractLibraryNodePushable(ctx) {
+            @Override
+            protected void execute() throws IOException {
+                if (LOGGER.isInfoEnabled()) {
+                    LOGGER.info("Abort deployment of library {}.{}", dataverseName, libraryName);
+                }
+
+                FileReference libDir = getLibraryDir();
+
+                FileReference rev0 = getRev0Dir();
+                FileReference rev1 = getRev1Dir();
+                if (rev0.getFile().exists() || rev1.getFile().exists()) {
+                    // #. drop 'stage' dir.
+                    FileReference stage = getStageDir();
+                    dropIfExists(stage);
+                    flushDirectory(libDir);
+                } else {
+                    // #. drop the whole library dir
+                    dropIfExists(libDir);
+                    flushDirectory(libDir.getFile().getParentFile());
+                }
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployCommitOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployCommitOperatorDescriptor.java
new file mode 100644
index 0000000..30635a7
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployCommitOperatorDescriptor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.operators;
+
+import java.io.IOException;
+
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class LibraryDeployCommitOperatorDescriptor extends AbstractLibraryOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = LogManager.getLogger(LibraryDeployCommitOperatorDescriptor.class);
+
+    public LibraryDeployCommitOperatorDescriptor(IOperatorDescriptorRegistry spec, DataverseName dataverseName,
+            String libraryName) {
+        super(spec, dataverseName, libraryName);
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new AbstractLibraryNodePushable(ctx) {
+            @Override
+            protected void execute() throws IOException {
+                if (LOGGER.isInfoEnabled()) {
+                    LOGGER.info("Commit deployment of library {}.{}", dataverseName, libraryName);
+                }
+
+                // #. rename 'stage' dir into 'rev_1' dir
+                FileReference rev1 = getRev1Dir();
+                FileReference stage = getStageDir();
+                move(stage, rev1);
+
+                // #. flush library dir
+                FileReference libDir = getLibraryDir();
+                flushDirectory(libDir);
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
new file mode 100644
index 0000000..a576c34
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.operators;
+
+import static org.apache.asterix.external.library.ExternalLibraryManager.DESCRIPTOR_FILE_NAME;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+import org.apache.asterix.common.functions.ExternalFunctionLanguage;
+import org.apache.asterix.common.library.LibraryDescriptor;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.util.file.FileUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class LibraryDeployPrepareOperatorDescriptor extends AbstractLibraryOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final int DOWNLOAD_RETRY_COUNT = 10;
+
+    private static final Logger LOGGER = LogManager.getLogger(LibraryDeployPrepareOperatorDescriptor.class);
+
+    private final ExternalFunctionLanguage language;
+    private final URI libLocation;
+    private final String authToken;
+
+    public LibraryDeployPrepareOperatorDescriptor(IOperatorDescriptorRegistry spec, DataverseName dataverseName,
+            String libraryName, ExternalFunctionLanguage language, URI libLocation, String authToken) {
+        super(spec, dataverseName, libraryName);
+        this.language = language;
+        this.libLocation = libLocation;
+        this.authToken = authToken;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new AbstractLibraryNodePushable(ctx) {
+
+            private byte[] copyBuffer;
+
+            @Override
+            protected void execute() throws IOException {
+                if (LOGGER.isInfoEnabled()) {
+                    LOGGER.info("Prepare deployment of library {}.{}", dataverseName, libraryName);
+                }
+
+                // #. create library dir if necessary, clean 'stage' dir
+                FileReference libDir = getLibraryDir();
+                Path libDirPath = libDir.getFile().toPath();
+
+                FileReference stage = getStageDir();
+                if (Files.isDirectory(libDirPath)) {
+                    dropIfExists(stage);
+                } else {
+                    dropIfExists(libDir);
+                    FileUtil.forceMkdirs(libDir.getFile());
+                    Path dataverseDir = libDirPath.getParent();
+                    flushDirectory(dataverseDir); // might've created this dir
+                    flushDirectory(dataverseDir.getParent()); // might've created this dir
+                }
+                mkdir(stage);
+
+                // #. download new content into 'stage' dir
+                fetch(stage);
+
+                // #. close the library (close its open files if any)
+                closeLibrary();
+
+                // #. if 'rev_1' dir exists then rename 'rev_1' dir to 'rev_0' dir.
+                FileReference rev1 = getRev1Dir();
+                if (rev1.getFile().exists()) {
+                    FileReference rev0 = getRev0Dir();
+                    move(rev1, rev0);
+                }
+
+                // #. flush library dir
+                flushDirectory(libDir);
+            }
+
+            private void fetch(FileReference stageDir) throws IOException {
+
+                String libLocationPath = libLocation.getPath();
+                String fileExt = FilenameUtils.getExtension(libLocationPath);
+
+                FileReference targetFile = stageDir.getChild("lib." + fileExt);
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("Downloading library from {} into {}", libLocation, targetFile);
+                }
+                download(targetFile);
+
+                // extract from the archive
+                FileReference contentsDir = stageDir.getChild(ExternalLibraryManager.CONTENTS_DIR_NAME);
+                mkdir(contentsDir);
+
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("Extracting library from {} into {}", targetFile, contentsDir);
+                }
+
+                switch (language) {
+                    case JAVA:
+                        if (!LibraryDescriptor.FILE_EXT_ZIP.equals(fileExt)) {
+                            // shouldn't happen
+                            throw new IOException("Unexpected file type: " + fileExt);
+                        }
+                        unzip(targetFile, contentsDir);
+                        break;
+                    case PYTHON:
+                        if (!LibraryDescriptor.FILE_EXT_PYZ.equals(fileExt)) {
+                            // shouldn't happen
+                            throw new IOException("Unexpected file type: " + fileExt);
+                        }
+                        shiv(targetFile, stageDir, contentsDir);
+                        break;
+                    default:
+                        // shouldn't happen
+                        throw new IOException("Unexpected language: " + language);
+                }
+
+                // write library descriptor
+                FileReference targetDescFile = stageDir.getChild(DESCRIPTOR_FILE_NAME);
+                if (LOGGER.isTraceEnabled()) {
+                    LOGGER.trace("Writing library descriptor into {}", targetDescFile);
+                }
+                writeDescriptor(targetDescFile, new LibraryDescriptor(language));
+
+                flushDirectory(contentsDir);
+                flushDirectory(stageDir);
+            }
+
+            private void download(FileReference targetFile) throws HyracksException {
+                try {
+                    targetFile.getFile().createNewFile();
+                } catch (IOException e) {
+                    throw HyracksDataException.create(e);
+                }
+                IFileHandle fHandle = ioManager.open(targetFile, IIOManager.FileReadWriteMode.READ_WRITE,
+                        IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+                try {
+                    CloseableHttpClient httpClient = HttpClientBuilder.create().build();
+                    try {
+                        // retry 10 times at maximum for downloading binaries
+                        HttpGet request = new HttpGet(libLocation);
+                        request.setHeader(HttpHeaders.AUTHORIZATION, authToken);
+                        int tried = 0;
+                        Exception trace = null;
+                        while (tried < DOWNLOAD_RETRY_COUNT) {
+                            tried++;
+                            CloseableHttpResponse response = null;
+                            try {
+                                response = httpClient.execute(request);
+                                if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+                                    throw new IOException("Http Error: " + response.getStatusLine().getStatusCode());
+                                }
+                                HttpEntity e = response.getEntity();
+                                if (e == null) {
+                                    throw new IOException("No response");
+                                }
+                                WritableByteChannel outChannel = ioManager.newWritableChannel(fHandle);
+                                OutputStream outStream = Channels.newOutputStream(outChannel);
+                                e.writeTo(outStream);
+                                outStream.flush();
+                                ioManager.sync(fHandle, true);
+                                return;
+                            } catch (IOException e) {
+                                LOGGER.error("Unable to download library", e);
+                                trace = e;
+                                try {
+                                    ioManager.truncate(fHandle, 0);
+                                } catch (IOException e2) {
+                                    throw HyracksDataException.create(e2);
+                                }
+                            } finally {
+                                if (response != null) {
+                                    try {
+                                        response.close();
+                                    } catch (IOException e) {
+                                        LOGGER.warn("Failed to close", e);
+                                    }
+                                }
+                            }
+                        }
+
+                        throw HyracksDataException.create(trace);
+                    } finally {
+                        try {
+                            httpClient.close();
+                        } catch (IOException e) {
+                            LOGGER.warn("Failed to close", e);
+                        }
+                    }
+                } finally {
+                    try {
+                        ioManager.close(fHandle);
+                    } catch (HyracksDataException e) {
+                        LOGGER.warn("Failed to close", e);
+                    }
+                }
+            }
+
+            private void unzip(FileReference sourceFile, FileReference outputDir) throws IOException {
+                boolean logTraceEnabled = LOGGER.isTraceEnabled();
+                Set<Path> newDirs = new HashSet<>();
+                Path outputDirPath = outputDir.getFile().toPath().toAbsolutePath().normalize();
+                try (ZipFile zipFile = new ZipFile(sourceFile.getFile())) {
+                    Enumeration<? extends ZipEntry> entries = zipFile.entries();
+                    while (entries.hasMoreElements()) {
+                        ZipEntry entry = entries.nextElement();
+                        if (entry.isDirectory()) {
+                            continue;
+                        }
+                        Path entryOutputPath = outputDirPath.resolve(entry.getName()).toAbsolutePath().normalize();
+                        if (!entryOutputPath.startsWith(outputDirPath)) {
+                            throw new IOException("Malformed ZIP archive: " + entry.getName());
+                        }
+                        Path entryOutputDir = entryOutputPath.getParent();
+                        Files.createDirectories(entryOutputDir);
+                        // remember new directories so we can flush them later
+                        for (Path p = entryOutputDir; !p.equals(outputDirPath); p = p.getParent()) {
+                            newDirs.add(p);
+                        }
+                        try (InputStream in = zipFile.getInputStream(entry)) {
+                            FileReference entryOutputFileRef =
+                                    ioManager.resolveAbsolutePath(entryOutputPath.toString());
+                            if (logTraceEnabled) {
+                                LOGGER.trace("Extracting file {}", entryOutputFileRef);
+                            }
+                            writeAndForce(entryOutputFileRef, in);
+                        }
+                    }
+                }
+                for (Path newDir : newDirs) {
+                    flushDirectory(newDir);
+                }
+            }
+
+            private void shiv(FileReference sourceFile, FileReference stageDir, FileReference contentsDir)
+                    throws IOException {
+                FileReference pyro4 = stageDir.getChild("pyro4.pyz");
+                writeShim(pyro4);
+                unzip(sourceFile, contentsDir);
+                unzip(pyro4, contentsDir);
+                writeShim(contentsDir.getChild("entrypoint.py"));
+                Files.delete(pyro4.getFile().toPath());
+            }
+
+            private void writeShim(FileReference outputFile) throws IOException {
+                InputStream is = getClass().getClassLoader().getResourceAsStream(outputFile.getFile().getName());
+                if (is == null) {
+                    throw new IOException("Classpath does not contain necessary Python resources!");
+                }
+                try {
+                    writeAndForce(outputFile, is);
+                } finally {
+                    is.close();
+                }
+            }
+
+            private void writeDescriptor(FileReference descFile, LibraryDescriptor desc) throws IOException {
+                byte[] bytes = libraryManager.serializeLibraryDescriptor(desc);
+                writeAndForce(descFile, new ByteArrayInputStream(bytes));
+            }
+
+            private void writeAndForce(FileReference outputFile, InputStream dataStream) throws IOException {
+                outputFile.getFile().createNewFile();
+                IFileHandle fHandle = ioManager.open(outputFile, IIOManager.FileReadWriteMode.READ_WRITE,
+                        IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+                try {
+                    WritableByteChannel outChannel = ioManager.newWritableChannel(fHandle);
+                    OutputStream outputStream = Channels.newOutputStream(outChannel);
+                    IOUtils.copyLarge(dataStream, outputStream, getCopyBuffer());
+                    outputStream.flush();
+                    ioManager.sync(fHandle, true);
+                } finally {
+                    ioManager.close(fHandle);
+                }
+            }
+
+            private byte[] getCopyBuffer() {
+                if (copyBuffer == null) {
+                    copyBuffer = new byte[4096];
+                }
+                return copyBuffer;
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryUndeployOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryUndeployOperatorDescriptor.java
new file mode 100644
index 0000000..a146ee3
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryUndeployOperatorDescriptor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.operators;
+
+import java.io.IOException;
+
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class LibraryUndeployOperatorDescriptor extends AbstractLibraryOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = LogManager.getLogger(LibraryUndeployOperatorDescriptor.class);
+
+    public LibraryUndeployOperatorDescriptor(IOperatorDescriptorRegistry spec, DataverseName dataverseName,
+            String libraryName) {
+        super(spec, dataverseName, libraryName);
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return new AbstractLibraryNodePushable(ctx) {
+            @Override
+            protected void execute() throws IOException {
+                if (LOGGER.isInfoEnabled()) {
+                    LOGGER.info("Undeploying library {}.{}", dataverseName, libraryName);
+                }
+
+                try {
+                    // #. close the library (close its open files if any)
+                    closeLibrary();
+                } finally {
+                    // #. drop the whole library dir
+                    FileReference libDir = getLibraryDir();
+                    dropIfExists(libDir);
+                    flushDirectory(libDir.getFile().getParentFile());
+                }
+            }
+        };
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
index 96f06f4..a4fdcfb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -30,7 +30,7 @@
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -44,7 +44,7 @@
     }
 
     // get adapter factory. this method has the side effect of modifying the configuration as necessary
-    public static ITypedAdapterFactory getAdapterFactory(IServiceContext serviceCtx, String adapterName,
+    public static ITypedAdapterFactory getAdapterFactory(ICCServiceContext serviceCtx, String adapterName,
             Map<String, String> configuration, ARecordType itemType, ARecordType metaType,
             IWarningCollector warningCollector) throws HyracksDataException, AlgebricksException {
         ExternalDataUtils.defaultConfiguration(configuration);
@@ -59,7 +59,7 @@
     }
 
     // get indexing adapter factory. this method has the side effect of modifying the configuration as necessary
-    public static IIndexingAdapterFactory getIndexingAdapterFactory(IServiceContext serviceCtx, String adapterName,
+    public static IIndexingAdapterFactory getIndexingAdapterFactory(ICCServiceContext serviceCtx, String adapterName,
             Map<String, String> configuration, ARecordType itemType, List<ExternalFile> snapshot, boolean indexingOp,
             ARecordType metaType, IWarningCollector warningCollector) throws HyracksDataException, AlgebricksException {
         ExternalDataUtils.defaultConfiguration(configuration);
@@ -73,7 +73,7 @@
     }
 
     // Lookup Adapters
-    public static LookupAdapterFactory<?> getLookupAdapterFactory(IServiceContext serviceCtx,
+    public static LookupAdapterFactory<?> getLookupAdapterFactory(ICCServiceContext serviceCtx,
             Map<String, String> configuration, ARecordType recordType, int[] ridFields, boolean retainInput,
             boolean retainMissing, IMissingWriterFactory missingWriterFactory, IWarningCollector warningCollector)
             throws HyracksDataException, AlgebricksException {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index 5650759..8fa0aca 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -31,8 +31,6 @@
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
 import org.apache.asterix.external.api.IInputStreamFactory;
@@ -42,6 +40,7 @@
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class DatasourceFactoryProvider {
@@ -52,26 +51,27 @@
     private DatasourceFactoryProvider() {
     }
 
-    public static IExternalDataSourceFactory getExternalDataSourceFactory(ILibraryManager libraryManager,
-            Map<String, String> configuration) throws HyracksDataException, AsterixException {
+    public static IExternalDataSourceFactory getExternalDataSourceFactory(Map<String, String> configuration)
+            throws HyracksDataException, AsterixException {
         // Take a copy of the configuration
         if (ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.RECORDS)) {
             String reader = configuration.get(ExternalDataConstants.KEY_READER);
-            return DatasourceFactoryProvider.getRecordReaderFactory(libraryManager, reader, configuration);
+            return DatasourceFactoryProvider.getRecordReaderFactory(reader, configuration);
         } else {
             // get stream source
             String streamSource = configuration.get(ExternalDataConstants.KEY_STREAM_SOURCE);
-            return DatasourceFactoryProvider.getInputStreamFactory(libraryManager, streamSource, configuration);
+            return DatasourceFactoryProvider.getInputStreamFactory(streamSource, configuration);
         }
     }
 
-    public static IInputStreamFactory getInputStreamFactory(ILibraryManager libraryManager, String streamSource,
-            Map<String, String> configuration) throws HyracksDataException {
+    public static IInputStreamFactory getInputStreamFactory(String streamSource, Map<String, String> configuration)
+            throws HyracksDataException {
         IInputStreamFactory streamSourceFactory;
         if (ExternalDataUtils.isExternal(streamSource)) {
-            DataverseName dataverse = ExternalDataUtils.getDataverse(configuration);
-            streamSourceFactory =
-                    ExternalDataUtils.createExternalInputStreamFactory(libraryManager, dataverse, streamSource);
+            //DataverseName dataverse = ExternalDataUtils.getDataverse(configuration);
+            //streamSourceFactory =
+            //        ExternalDataUtils.createExternalInputStreamFactory(libraryManager, dataverse, streamSource);
+            throw new NotImplementedException();
         } else {
             switch (streamSource) {
                 case ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS:
@@ -104,10 +104,11 @@
         }
     }
 
-    public static IRecordReaderFactory getRecordReaderFactory(ILibraryManager libraryManager, String adaptorName,
-            Map<String, String> configuration) throws HyracksDataException, AsterixException {
+    public static IRecordReaderFactory getRecordReaderFactory(String adaptorName, Map<String, String> configuration)
+            throws HyracksDataException, AsterixException {
         if (adaptorName.equals(ExternalDataConstants.EXTERNAL)) {
-            return ExternalDataUtils.createExternalRecordReaderFactory(libraryManager, configuration);
+            //return ExternalDataUtils.createExternalRecordReaderFactory(libraryManager, configuration);
+            throw new NotImplementedException();
         }
 
         if (factories == null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index 2265a25..4c10339 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -29,11 +29,11 @@
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.external.api.IDataParserFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 
 public class ParserFactoryProvider {
 
@@ -43,13 +43,13 @@
     private ParserFactoryProvider() {
     }
 
-    public static IDataParserFactory getDataParserFactory(ILibraryManager libraryManager,
-            Map<String, String> configuration) throws AsterixException {
+    public static IDataParserFactory getDataParserFactory(Map<String, String> configuration) throws AsterixException {
         IDataParserFactory parserFactory;
         String parserFactoryName = configuration.get(ExternalDataConstants.KEY_PARSER);
         if (ExternalDataUtils.isExternal(parserFactoryName)) {
-            return ExternalDataUtils.createExternalParserFactory(libraryManager,
-                    ExternalDataUtils.getDataverse(configuration), parserFactoryName);
+            //return ExternalDataUtils.createExternalParserFactory(libraryManager,
+            //        ExternalDataUtils.getDataverse(configuration), parserFactoryName);
+            throw new NotImplementedException();
         } else {
             String parserFactoryKey = ExternalDataUtils.getParserFactory(configuration);
             parserFactory = ParserFactoryProvider.getDataParserFactory(parserFactoryKey);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 052a8e0..b70c6ad2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -222,6 +222,7 @@
         return value == null ? false : Boolean.valueOf(value);
     }
 
+    // Currently not used.
     public static IRecordReaderFactory<?> createExternalRecordReaderFactory(ILibraryManager libraryManager,
             Map<String, String> configuration) throws AsterixException {
         String readerFactory = configuration.get(ExternalDataConstants.KEY_READER_FACTORY);
@@ -234,14 +235,19 @@
             throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER_FACTORY
                     + " must follow the format \"DataverseName.LibraryName#ReaderFactoryFullyQualifiedName\"");
         }
-        String[] dataverseAndLibrary = libraryAndFactory[0].split(".");
+        String[] dataverseAndLibrary = libraryAndFactory[0].split("\\.");
         if (dataverseAndLibrary.length != 2) {
             throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER_FACTORY
                     + " must follow the format \"DataverseName.LibraryName#ReaderFactoryFullyQualifiedName\"");
         }
         DataverseName dataverseName = DataverseName.createSinglePartName(dataverseAndLibrary[0]); //TODO(MULTI_PART_DATAVERSE_NAME):REVISIT
         String libraryName = dataverseAndLibrary[1];
-        ILibrary lib = libraryManager.getLibrary(dataverseName, libraryName);
+        ILibrary lib;
+        try {
+            lib = libraryManager.getLibrary(dataverseName, libraryName);
+        } catch (HyracksDataException e) {
+            throw new AsterixException("Cannot load library", e);
+        }
         if (lib.getLanguage() != ExternalFunctionLanguage.JAVA) {
             throw new AsterixException("Unexpected library language: " + lib.getLanguage());
         }
@@ -253,12 +259,18 @@
         }
     }
 
+    // Currently not used.
     public static IDataParserFactory createExternalParserFactory(ILibraryManager libraryManager,
             DataverseName dataverse, String parserFactoryName) throws AsterixException {
         try {
             String library = parserFactoryName.substring(0,
                     parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR));
-            ILibrary lib = libraryManager.getLibrary(dataverse, library);
+            ILibrary lib;
+            try {
+                lib = libraryManager.getLibrary(dataverse, library);
+            } catch (HyracksDataException e) {
+                throw new AsterixException("Cannot load library", e);
+            }
             if (lib.getLanguage() != ExternalFunctionLanguage.JAVA) {
                 throw new AsterixException("Unexpected library language: " + lib.getLanguage());
             }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 0bd601b..f36e35b 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -24,16 +24,13 @@
 
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.external.IDataSourceAdapter;
-import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.dataflow.TupleForwarder;
 import org.apache.asterix.external.parser.ADMDataParser;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -52,20 +49,14 @@
 
     private Map<String, String> configuration;
 
-    private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
-
-    private transient IServiceContext serviceContext;
-
     @Override
     public String getAlias() {
         return "test_typed";
     }
 
     @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
-        clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(
-                (ICcApplicationContext) serviceContext.getApplicationContext(), clusterLocations, 1);
-        return clusterLocations;
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        throw new IllegalStateException(); // shouldn't be called for external adapters
     }
 
     @Override
@@ -114,9 +105,8 @@
     }
 
     @Override
-    public void configure(IServiceContext serviceContext, Map<String, String> configuration,
+    public void configure(ICCServiceContext serviceContext, Map<String, String> configuration,
             IWarningCollector warningCollector) {
-        this.serviceContext = serviceContext;
         this.configuration = configuration;
     }
 
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java
index 42f1f99..dbcfe0a 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java
@@ -54,8 +54,7 @@
         format = configuration.get(ExternalDataConstants.KEY_RECORD_FORMAT);
         parserFormats.add(format);
         parserConf.put(ExternalDataConstants.KEY_FORMAT, format);
-        recordParserFactory =
-                (IRecordDataParserFactory<char[]>) ParserFactoryProvider.getDataParserFactory(null, parserConf);
+        recordParserFactory = (IRecordDataParserFactory<char[]>) ParserFactoryProvider.getDataParserFactory(parserConf);
         recordParserFactory.setRecordType(recordType);
         recordParserFactory.configure(configuration);
     }
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
index ba6e7b2..1e3e68f 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
@@ -79,8 +79,10 @@
         DROP_FEED_POLICY,
         CREATE_FUNCTION,
         CREATE_ADAPTER,
+        CREATE_LIBRARY,
         FUNCTION_DROP,
         ADAPTER_DROP,
+        LIBRARY_DROP,
         CREATE_SYNONYM,
         SYNONYM_DROP,
         COMPACT,
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateLibraryStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateLibraryStatement.java
new file mode 100644
index 0000000..ebdeeef0
--- /dev/null
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateLibraryStatement.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.lang.common.statement;
+
+import java.net.URI;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.functions.ExternalFunctionLanguage;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.lang.common.base.AbstractStatement;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+
+public final class CreateLibraryStatement extends AbstractStatement {
+
+    private final DataverseName dataverseName;
+    private final String libraryName;
+    private final ExternalFunctionLanguage lang;
+    private final URI location;
+    private final boolean replaceIfExists;
+    private final String authToken;
+
+    public CreateLibraryStatement(DataverseName dataverseName, String libraryName, ExternalFunctionLanguage lang,
+            URI location, boolean replaceIfExists, String authToken) {
+        this.dataverseName = dataverseName;
+        this.libraryName = libraryName;
+        this.lang = lang;
+        this.location = location;
+        this.replaceIfExists = replaceIfExists;
+        this.authToken = authToken;
+    }
+
+    public DataverseName getDataverseName() {
+        return dataverseName;
+    }
+
+    public String getLibraryName() {
+        return libraryName;
+    }
+
+    public ExternalFunctionLanguage getLang() {
+        return lang;
+    }
+
+    public URI getLocation() {
+        return location;
+    }
+
+    public boolean getReplaceIfExists() {
+        return replaceIfExists;
+    }
+
+    public String getAuthToken() {
+        return authToken;
+    }
+
+    @Override
+    public Kind getKind() {
+        return Kind.CREATE_LIBRARY;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
+        return visitor.visit(this, arg);
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/LibraryDropStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/LibraryDropStatement.java
new file mode 100644
index 0000000..09ea3a8
--- /dev/null
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/LibraryDropStatement.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.lang.common.statement;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.lang.common.base.AbstractStatement;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+
+public final class LibraryDropStatement extends AbstractStatement {
+
+    private final DataverseName dataverseName;
+    private final String libraryName;
+
+    public LibraryDropStatement(DataverseName dataverseName, String libraryName) {
+        this.dataverseName = dataverseName;
+        this.libraryName = libraryName;
+    }
+
+    public DataverseName getDataverseName() {
+        return dataverseName;
+    }
+
+    public String getLibraryName() {
+        return libraryName;
+    }
+
+    @Override
+    public Kind getKind() {
+        return Kind.LIBRARY_DROP;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.DDL;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
+        return visitor.visit(this, arg);
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
index 47e934e..62763d1 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
@@ -72,6 +72,7 @@
 import org.apache.asterix.lang.common.statement.CreateFeedStatement;
 import org.apache.asterix.lang.common.statement.CreateFunctionStatement;
 import org.apache.asterix.lang.common.statement.CreateIndexStatement;
+import org.apache.asterix.lang.common.statement.CreateLibraryStatement;
 import org.apache.asterix.lang.common.statement.CreateSynonymStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
@@ -87,6 +88,7 @@
 import org.apache.asterix.lang.common.statement.IndexDropStatement;
 import org.apache.asterix.lang.common.statement.InsertStatement;
 import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
+import org.apache.asterix.lang.common.statement.LibraryDropStatement;
 import org.apache.asterix.lang.common.statement.LoadStatement;
 import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
 import org.apache.asterix.lang.common.statement.NodegroupDecl;
@@ -853,6 +855,18 @@
     }
 
     @Override
+    public Void visit(CreateLibraryStatement cls, Integer arg) throws CompilationException {
+        // this statement is internal
+        return null;
+    }
+
+    @Override
+    public Void visit(LibraryDropStatement del, Integer arg) throws CompilationException {
+        // this statement is internal
+        return null;
+    }
+
+    @Override
     public Void visit(ListSliceExpression expression, Integer step) throws CompilationException {
         out.println(skip(step) + "ListSliceExpression [");
         expression.getExpr().accept(this, step + 1);
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
index f6b2f8d..9f4571d 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
@@ -33,6 +33,7 @@
 import org.apache.asterix.lang.common.statement.CreateFeedStatement;
 import org.apache.asterix.lang.common.statement.CreateFunctionStatement;
 import org.apache.asterix.lang.common.statement.CreateIndexStatement;
+import org.apache.asterix.lang.common.statement.CreateLibraryStatement;
 import org.apache.asterix.lang.common.statement.CreateSynonymStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
@@ -46,6 +47,7 @@
 import org.apache.asterix.lang.common.statement.FunctionDropStatement;
 import org.apache.asterix.lang.common.statement.IndexDropStatement;
 import org.apache.asterix.lang.common.statement.InsertStatement;
+import org.apache.asterix.lang.common.statement.LibraryDropStatement;
 import org.apache.asterix.lang.common.statement.LoadStatement;
 import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
 import org.apache.asterix.lang.common.statement.NodegroupDecl;
@@ -255,4 +257,14 @@
     public R visit(SynonymDropStatement del, T arg) throws CompilationException {
         return null;
     }
+
+    @Override
+    public R visit(CreateLibraryStatement cls, T arg) throws CompilationException {
+        return null;
+    }
+
+    @Override
+    public R visit(LibraryDropStatement del, T arg) throws CompilationException {
+        return null;
+    }
 }
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
index 5a9d68f..2957509 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
@@ -50,6 +50,7 @@
 import org.apache.asterix.lang.common.statement.CreateFeedStatement;
 import org.apache.asterix.lang.common.statement.CreateFunctionStatement;
 import org.apache.asterix.lang.common.statement.CreateIndexStatement;
+import org.apache.asterix.lang.common.statement.CreateLibraryStatement;
 import org.apache.asterix.lang.common.statement.CreateSynonymStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
@@ -63,6 +64,7 @@
 import org.apache.asterix.lang.common.statement.FunctionDropStatement;
 import org.apache.asterix.lang.common.statement.IndexDropStatement;
 import org.apache.asterix.lang.common.statement.InsertStatement;
+import org.apache.asterix.lang.common.statement.LibraryDropStatement;
 import org.apache.asterix.lang.common.statement.LoadStatement;
 import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
 import org.apache.asterix.lang.common.statement.NodegroupDecl;
@@ -182,6 +184,10 @@
 
     R visit(AdapterDropStatement del, T arg) throws CompilationException;
 
+    R visit(CreateLibraryStatement cls, T arg) throws CompilationException;
+
+    R visit(LibraryDropStatement del, T arg) throws CompilationException;
+
     R visit(CreateSynonymStatement css, T arg) throws CompilationException;
 
     R visit(SynonymDropStatement del, T arg) throws CompilationException;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index e06e22f..7f50cab 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -962,6 +962,18 @@
     }
 
     @Override
+    public void updateLibrary(MetadataTransactionContext ctx, Library library) throws AlgebricksException {
+        try {
+            metadataNode.updateLibrary(ctx.getTxnId(), library);
+        } catch (RemoteException e) {
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
+        }
+        // reflect the library into the cache
+        ctx.dropLibrary(library.getDataverseName(), library.getName());
+        ctx.addLibrary(library);
+    }
+
+    @Override
     public <T extends IExtensionMetadataEntity> void addEntity(MetadataTransactionContext mdTxnCtx, T entity)
             throws AlgebricksException {
         try {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index ca98328..6fd6376 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -525,73 +525,63 @@
                 dropSynonym(txnId, dataverseName, synonym.getSynonymName());
             }
 
-            // As a side effect, acquires an S lock on the 'Function' dataset
-            // on behalf of txnId.
-            List<Function> dataverseFunctions = getDataverseFunctions(txnId, dataverseName);
-            // Drop all functions in this dataverse.
-            for (Function function : dataverseFunctions) {
-                dropFunction(txnId, new FunctionSignature(dataverseName, function.getName(), function.getArity()),
-                        true);
-            }
-
-            //Drop libraries, similarly.
-            for (Library lib : getDataverseLibraries(txnId, dataverseName)) {
-                dropLibrary(txnId, lib.getDataverseName(), lib.getName());
-            }
-
-            List<Dataset> dataverseDatasets;
-            Dataset ds;
-            dataverseDatasets = getDataverseDatasets(txnId, dataverseName);
-            // Drop all datasets in this dataverse.
-            for (int i = 0; i < dataverseDatasets.size(); i++) {
-                ds = dataverseDatasets.get(i);
-                dropDataset(txnId, dataverseName, ds.getDatasetName(), true);
-            }
-
-            // After dropping datasets, drop datatypes
-            List<Datatype> dataverseDatatypes;
-            // As a side effect, acquires an S lock on the 'datatype' dataset
-            // on behalf of txnId.
-            dataverseDatatypes = getDataverseDatatypes(txnId, dataverseName);
-            // Drop all types in this dataverse.
-            for (int i = 0; i < dataverseDatatypes.size(); i++) {
-                forceDropDatatype(txnId, dataverseName, dataverseDatatypes.get(i).getDatatypeName());
-            }
-
-            // As a side effect, acquires an S lock on the 'Adapter' dataset
-            // on behalf of txnId.
-            List<DatasourceAdapter> dataverseAdapters = getDataverseAdapters(txnId, dataverseName);
-            // Drop all functions in this dataverse.
-            for (DatasourceAdapter adapter : dataverseAdapters) {
-                dropAdapter(txnId, dataverseName, adapter.getAdapterIdentifier().getName());
-            }
-
-            List<Feed> dataverseFeeds;
-            List<FeedConnection> feedConnections;
-            Feed feed;
-            dataverseFeeds = getDataverseFeeds(txnId, dataverseName);
-            // Drop all feeds&connections in this dataverse.
-            for (int i = 0; i < dataverseFeeds.size(); i++) {
-                feed = dataverseFeeds.get(i);
-                feedConnections = getFeedConnections(txnId, dataverseName, feed.getFeedName());
+            // Drop all feeds and connections in this dataverse.
+            // Feeds may depend on datatypes and adapters
+            List<Feed> dataverseFeeds = getDataverseFeeds(txnId, dataverseName);
+            for (Feed feed : dataverseFeeds) {
+                List<FeedConnection> feedConnections = getFeedConnections(txnId, dataverseName, feed.getFeedName());
                 for (FeedConnection feedConnection : feedConnections) {
                     dropFeedConnection(txnId, dataverseName, feed.getFeedName(), feedConnection.getDatasetName());
                 }
                 dropFeed(txnId, dataverseName, feed.getFeedName());
             }
 
+            // Drop all feed ingestion policies in this dataverse.
             List<FeedPolicyEntity> feedPolicies = getDataverseFeedPolicies(txnId, dataverseName);
-            if (feedPolicies != null && !feedPolicies.isEmpty()) {
-                // Drop all feed ingestion policies in this dataverse.
-                for (FeedPolicyEntity feedPolicy : feedPolicies) {
-                    dropFeedPolicy(txnId, dataverseName, feedPolicy.getPolicyName());
-                }
+            for (FeedPolicyEntity feedPolicy : feedPolicies) {
+                dropFeedPolicy(txnId, dataverseName, feedPolicy.getPolicyName());
+            }
+
+            // Drop all functions in this dataverse.
+            // Functions may depend on datatypes and libraries
+            // As a side effect, acquires an S lock on the 'Function' dataset on behalf of txnId.
+            List<Function> dataverseFunctions = getDataverseFunctions(txnId, dataverseName);
+            for (Function function : dataverseFunctions) {
+                dropFunction(txnId, new FunctionSignature(dataverseName, function.getName(), function.getArity()),
+                        true);
+            }
+
+            // Drop all adapters in this dataverse.
+            // Adapters depend on libraries.
+            // As a side effect, acquires an S lock on the 'Adapter' dataset on behalf of txnId.
+            List<DatasourceAdapter> dataverseAdapters = getDataverseAdapters(txnId, dataverseName);
+            for (DatasourceAdapter adapter : dataverseAdapters) {
+                dropAdapter(txnId, dataverseName, adapter.getAdapterIdentifier().getName());
+            }
+
+            // Drop all libraries in this dataverse.
+            List<Library> dataverseLibraries = getDataverseLibraries(txnId, dataverseName);
+            for (Library lib : dataverseLibraries) {
+                dropLibrary(txnId, lib.getDataverseName(), lib.getName());
+            }
+
+            // Drop all datasets and indexes in this dataverse.
+            // Datasets depend on datatypes
+            List<Dataset> dataverseDatasets = getDataverseDatasets(txnId, dataverseName);
+            for (Dataset ds : dataverseDatasets) {
+                dropDataset(txnId, dataverseName, ds.getDatasetName(), true);
+            }
+
+            // Drop all types in this dataverse.
+            // As a side effect, acquires an S lock on the 'datatype' dataset on behalf of txnId.
+            List<Datatype> dataverseDatatypes = getDataverseDatatypes(txnId, dataverseName);
+            for (Datatype dataverseDatatype : dataverseDatatypes) {
+                forceDropDatatype(txnId, dataverseName, dataverseDatatype.getDatatypeName());
             }
 
             // Delete the dataverse entry from the 'dataverse' dataset.
+            // As a side effect, acquires an S lock on the 'dataverse' dataset on behalf of txnId.
             ITupleReference searchKey = createTuple(dataverseName);
-            // As a side effect, acquires an S lock on the 'dataverse' dataset
-            // on behalf of txnId.
             ITupleReference tuple = getTupleToBeDeleted(txnId, MetadataPrimaryIndexes.DATAVERSE_DATASET, searchKey);
             deleteTupleFromIndex(txnId, MetadataPrimaryIndexes.DATAVERSE_DATASET, tuple);
         } catch (HyracksDataException e) {
@@ -911,6 +901,19 @@
         }
     }
 
+    public List<DatasourceAdapter> getAllAdapters(TxnId txnId) throws AlgebricksException {
+        try {
+            DatasourceAdapterTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getAdapterTupleTranslator(false);
+            List<DatasourceAdapter> results = new ArrayList<>();
+            IValueExtractor<DatasourceAdapter> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
+            searchIndex(txnId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, null, valueExtractor, results);
+            return results;
+        } catch (HyracksDataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
     private void confirmDataverseCanBeDeleted(TxnId txnId, DataverseName dataverseName) throws AlgebricksException {
         // If a dataset from a DIFFERENT dataverse
         // uses a type from this dataverse
@@ -991,6 +994,37 @@
         }
     }
 
+    private void confirmLibraryCanBeDeleted(TxnId txnId, DataverseName dataverseName, String libraryName)
+            throws AlgebricksException {
+        confirmLibraryIsUnusedByFunctions(txnId, dataverseName, libraryName);
+        confirmLibraryIsUnusedByAdapters(txnId, dataverseName, libraryName);
+    }
+
+    private void confirmLibraryIsUnusedByFunctions(TxnId txnId, DataverseName dataverseName, String libraryName)
+            throws AlgebricksException {
+        List<Function> functions = getAllFunctions(txnId);
+        for (Function function : functions) {
+            if (libraryName.equals(function.getLibrary()) && dataverseName.equals(function.getDataverseName())) {
+                throw new AlgebricksException(
+                        "Cannot drop library " + dataverseName + '.' + libraryName + " being used by funciton "
+                                + function.getDataverseName() + '.' + function.getName() + '@' + function.getArity());
+            }
+        }
+    }
+
+    private void confirmLibraryIsUnusedByAdapters(TxnId txnId, DataverseName dataverseName, String libraryName)
+            throws AlgebricksException {
+        List<DatasourceAdapter> adapters = getAllAdapters(txnId);
+        for (DatasourceAdapter adapter : adapters) {
+            if (libraryName.equals(adapter.getLibraryName())
+                    && adapter.getAdapterIdentifier().getDataverseName().equals(dataverseName)) {
+                throw new AlgebricksException("Cannot drop library " + dataverseName + '.' + libraryName
+                        + " being used by adapter " + adapter.getAdapterIdentifier().getDataverseName() + '.'
+                        + adapter.getAdapterIdentifier().getName());
+            }
+        }
+    }
+
     private void confirmDatatypeIsUnused(TxnId txnId, DataverseName dataverseName, String datatypeName)
             throws AlgebricksException {
         confirmDatatypeIsUnusedByDatatypes(txnId, dataverseName, datatypeName);
@@ -1582,10 +1616,8 @@
 
     @Override
     public void dropLibrary(TxnId txnId, DataverseName dataverseName, String libraryName) throws AlgebricksException {
-        Library library = getLibrary(txnId, dataverseName, libraryName);
-        if (library == null) {
-            throw new AlgebricksException("Cannot drop library '" + library + "' because it doesn't exist.");
-        }
+        confirmLibraryCanBeDeleted(txnId, dataverseName, libraryName);
+
         try {
             // Delete entry from the 'Library' dataset.
             ITupleReference searchKey = createTuple(dataverseName, libraryName);
@@ -1597,7 +1629,7 @@
         } catch (HyracksDataException e) {
             if (e.getComponent().equals(ErrorCode.HYRACKS)
                     && e.getErrorCode() == ErrorCode.UPDATE_OR_DELETE_NON_EXISTENT_KEY) {
-                throw new AlgebricksException("Cannot drop library '" + libraryName, e);
+                throw new AlgebricksException("Cannot drop library '" + libraryName + "' because it doesn't exist", e);
             } else {
                 throw new AlgebricksException(e);
             }
@@ -2026,6 +2058,27 @@
         }
     }
 
+    @Override
+    public void updateLibrary(TxnId txnId, Library library) throws AlgebricksException {
+        try {
+            // This method will delete previous entry of the library and insert the new one
+            // Delete entry from the 'library' dataset.
+            ITupleReference searchKey = createTuple(library.getDataverseName(), library.getName());
+            // Searches the index for the tuple to be deleted. Acquires an S
+            // lock on the 'library' dataset.
+            ITupleReference libraryTuple =
+                    getTupleToBeDeleted(txnId, MetadataPrimaryIndexes.LIBRARY_DATASET, searchKey);
+            deleteTupleFromIndex(txnId, MetadataPrimaryIndexes.LIBRARY_DATASET, libraryTuple);
+            // Previous tuple was deleted
+            // Insert into the 'library' dataset.
+            LibraryTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getLibraryTupleTranslator(true);
+            libraryTuple = tupleReaderWriter.getTupleFromMetadataEntity(library);
+            insertTupleIntoIndex(txnId, MetadataPrimaryIndexes.LIBRARY_DATASET, libraryTuple);
+        } catch (HyracksDataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
     public ITxnIdFactory getTxnIdFactory() {
         return txnIdFactory;
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
index 0de138f..90fe3a5 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
@@ -171,7 +171,7 @@
     }
 
     public void dropLibrary(DataverseName dataverseName, String libraryName) {
-        Library library = new Library(dataverseName, libraryName);
+        Library library = new Library(dataverseName, libraryName, null, MetadataUtil.PENDING_NO_OP);
         droppedCache.addLibraryIfNotExists(library);
         logAndApply(new MetadataLogicalOperation(library, false));
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index fbacac1..09a4a94 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -703,6 +703,16 @@
     void updateDataset(MetadataTransactionContext ctx, Dataset dataset) throws AlgebricksException;
 
     /**
+     * update an existing library in metadata.
+     *
+     * @param ctx
+     *            MetadataTransactionContext of an active metadata transaction.
+     * @param library
+     *            Existing Library.
+     */
+    void updateLibrary(MetadataTransactionContext ctx, Library library) throws AlgebricksException;
+
+    /**
      * Add an extension entity to its extension dataset under the ongoing metadata
      * transaction
      *
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index 8a052b7..0330008 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -803,6 +803,17 @@
     void updateDataset(TxnId txnId, Dataset dataset) throws AlgebricksException, RemoteException;
 
     /**
+     * update an existing library in the metadata, acquiring local locks on behalf
+     * of the given transaction id.
+     *
+     * @param txnId
+     *            A globally unique id for an active metadata transaction.
+     * @param library
+     *            updated Library instance.
+     */
+    void updateLibrary(TxnId txnId, Library library) throws AlgebricksException, RemoteException;
+
+    /**
      * Adds an extension entity under the ongoing transaction job id
      *
      * @param txnId
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 1962559..a1d6743 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -75,7 +75,7 @@
     public static final String FIELD_NAME_IS_PRIMARY = "IsPrimary";
     public static final String FIELD_NAME_KIND = "Kind";
     public static final String FIELD_NAME_LANGUAGE = "Language";
-    public static final String FIELD_NAME_LIBRARY = "Library";
+    public static final String FIELD_NAME_LIBRARY_NAME = "LibraryName";
     public static final String FIELD_NAME_LAST_REFRESH_TIME = "LastRefreshTime";
     public static final String FIELD_NAME_METATYPE_DATAVERSE_NAME = "MetatypeDataverseName";
     public static final String FIELD_NAME_METATYPE_NAME = "MetatypeName";
@@ -370,8 +370,6 @@
     public static final int DATASOURCE_ADAPTER_ARECORD_TIMESTAMP_FIELD_INDEX = 4;
     //open types
 
-    public static final String DATASOURCE_ARECORD_FUNCTION_LIBRARY_FIELD_NAME = "Library";
-
     public static final ARecordType DATASOURCE_ADAPTER_RECORDTYPE = createRecordType(
             // RecordTypeName
             RECORD_NAME_DATASOURCE_ADAPTER,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 3f5d0b0..8597a21 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -50,6 +50,7 @@
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.dataflow.data.nontagged.serde.SerializerDeserializerUtil;
+import org.apache.asterix.external.adapter.factory.ExternalAdapterFactory;
 import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
 import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
@@ -62,7 +63,6 @@
 import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
 import org.apache.asterix.external.provider.AdapterFactoryProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.FeedConstants;
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
@@ -488,10 +488,10 @@
                         policyAccessor, factoryOutput.second);
                 break;
             case EXTERNAL:
-                String libraryName = feed.getConfiguration().get(ExternalDataConstants.KEY_ADAPTER_NAME).trim()
-                        .split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
-                feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feed, libraryName,
-                        adapterFactory.getClass().getName(), recordType, policyAccessor, factoryOutput.second);
+                ExternalAdapterFactory extAdapterFactory = (ExternalAdapterFactory) adapterFactory;
+                feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feed, extAdapterFactory.getLibraryDataverse(),
+                        extAdapterFactory.getLibraryName(), extAdapterFactory.getClassName(), recordType,
+                        policyAccessor, factoryOutput.second);
                 break;
             default:
                 break;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
index f1a8f60..5e0ece3 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
@@ -25,25 +25,27 @@
 
 public class DatasourceAdapter implements IMetadataEntity<DatasourceAdapter> {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     private final AdapterIdentifier adapterIdentifier;
     private final String classname;
     private final AdapterType type;
-    private final String library;
+    //TODO:also need libraryDataverse
+    private final String libraryName;
 
     public DatasourceAdapter(AdapterIdentifier adapterIdentifier, String classname, AdapterType type) {
         this.adapterIdentifier = adapterIdentifier;
         this.classname = classname;
         this.type = type;
-        this.library = null;
+        this.libraryName = null;
     }
 
-    public DatasourceAdapter(AdapterIdentifier adapterIdentifier, String classname, AdapterType type, String library) {
+    public DatasourceAdapter(AdapterIdentifier adapterIdentifier, String classname, AdapterType type,
+            String libraryName) {
         this.adapterIdentifier = adapterIdentifier;
         this.classname = classname;
         this.type = type;
-        this.library = library;
+        this.libraryName = libraryName;
     }
 
     @Override
@@ -68,8 +70,7 @@
         return type;
     }
 
-    public String getLibrary() {
-        return library;
+    public String getLibraryName() {
+        return libraryName;
     }
-
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Library.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Library.java
index 87ead6a..494b5f1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Library.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Library.java
@@ -28,10 +28,14 @@
 
     private final DataverseName dataverse;
     private final String name;
+    private final String language;
+    private final int pendingOp;
 
-    public Library(DataverseName dataverseName, String libraryName) {
+    public Library(DataverseName dataverseName, String libraryName, String language, int pendingOp) {
         this.dataverse = dataverseName;
         this.name = libraryName;
+        this.language = language;
+        this.pendingOp = pendingOp;
     }
 
     public DataverseName getDataverseName() {
@@ -42,6 +46,14 @@
         return name;
     }
 
+    public String getLanguage() {
+        return language;
+    }
+
+    public int getPendingOp() {
+        return pendingOp;
+    }
+
     @Override
     public Library addToCache(MetadataCache cache) {
         return cache.addLibraryIfNotExists(this);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
index eb37a60..6279451 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
@@ -19,8 +19,6 @@
 
 package org.apache.asterix.metadata.entitytupletranslators;
 
-import static org.apache.asterix.metadata.bootstrap.MetadataRecordTypes.DATASOURCE_ARECORD_FUNCTION_LIBRARY_FIELD_NAME;
-
 import java.util.Calendar;
 
 import org.apache.asterix.common.external.IDataSourceAdapter;
@@ -31,7 +29,6 @@
 import org.apache.asterix.metadata.entities.DatasourceAdapter;
 import org.apache.asterix.om.base.ARecord;
 import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
@@ -59,17 +56,12 @@
                 ((AString) adapterRecord.getValueByPos(MetadataRecordTypes.DATASOURCE_ADAPTER_ARECORD_TYPE_FIELD_INDEX))
                         .getStringValue());
 
-        String library = getAdapterLibrary(adapterRecord);
+        int libraryNameIdx = adapterRecord.getType().getFieldIndex(MetadataRecordTypes.FIELD_NAME_LIBRARY_NAME);
+        String libraryName =
+                libraryNameIdx >= 0 ? ((AString) adapterRecord.getValueByPos(libraryNameIdx)).getStringValue() : null;
 
         return new DatasourceAdapter(new AdapterIdentifier(dataverseName, adapterName), classname, adapterType,
-                library);
-    }
-
-    private String getAdapterLibrary(ARecord adapterRecord) {
-        final ARecordType adapterType = adapterRecord.getType();
-        final int adapterLibraryIdx = adapterType.getFieldIndex(DATASOURCE_ARECORD_FUNCTION_LIBRARY_FIELD_NAME);
-        return adapterLibraryIdx >= 0 ? ((AString) adapterRecord.getValueByPos(adapterLibraryIdx)).getStringValue()
-                : null;
+                libraryName);
     }
 
     @Override
@@ -121,30 +113,30 @@
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
 
+        // write open fields
+        writeOpenFields(adapter);
+
         // write record
         recordBuilder.write(tupleBuilder.getDataOutput(), true);
         tupleBuilder.addFieldEndOffset();
 
         tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
-
-        writeOpenTypes(adapter);
-
         return tuple;
     }
 
-    void writeOpenTypes(DatasourceAdapter adapter) throws HyracksDataException {
+    void writeOpenFields(DatasourceAdapter adapter) throws HyracksDataException {
         writeLibrary(adapter);
     }
 
     protected void writeLibrary(DatasourceAdapter adapter) throws HyracksDataException {
-        if (null == adapter.getLibrary()) {
+        if (adapter.getLibraryName() == null) {
             return;
         }
         fieldName.reset();
-        aString.setValue(DATASOURCE_ARECORD_FUNCTION_LIBRARY_FIELD_NAME);
+        aString.setValue(MetadataRecordTypes.FIELD_NAME_LIBRARY_NAME);
         stringSerde.serialize(aString, fieldName.getDataOutput());
         fieldValue.reset();
-        aString.setValue(adapter.getLibrary());
+        aString.setValue(adapter.getLibraryName());
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(fieldName, fieldValue);
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/LibraryTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/LibraryTupleTranslator.java
index f521001..4792398 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/LibraryTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/LibraryTupleTranslator.java
@@ -19,14 +19,21 @@
 
 package org.apache.asterix.metadata.entitytupletranslators;
 
+import static org.apache.asterix.metadata.bootstrap.MetadataRecordTypes.FIELD_NAME_LANGUAGE;
+import static org.apache.asterix.metadata.bootstrap.MetadataRecordTypes.FIELD_NAME_PENDING_OP;
+
 import java.util.Calendar;
 
+import org.apache.asterix.common.functions.ExternalFunctionLanguage;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
 import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
 import org.apache.asterix.metadata.entities.Library;
+import org.apache.asterix.metadata.utils.MetadataUtil;
+import org.apache.asterix.om.base.AInt32;
 import org.apache.asterix.om.base.ARecord;
 import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
@@ -52,7 +59,16 @@
                 ((AString) libraryRecord.getValueByPos(MetadataRecordTypes.LIBRARY_ARECORD_NAME_FIELD_INDEX))
                         .getStringValue();
 
-        return new Library(dataverseName, libraryName);
+        ARecordType libraryRecordType = libraryRecord.getType();
+        int pendingOpIdx = libraryRecordType.getFieldIndex(FIELD_NAME_PENDING_OP);
+        int pendingOp = pendingOpIdx >= 0 ? ((AInt32) libraryRecord.getValueByPos(pendingOpIdx)).getIntegerValue()
+                : MetadataUtil.PENDING_NO_OP;
+
+        int languageIdx = libraryRecordType.getFieldIndex(FIELD_NAME_LANGUAGE);
+        String language = languageIdx >= 0 ? ((AString) libraryRecord.getValueByPos(languageIdx)).getStringValue()
+                : ExternalFunctionLanguage.JAVA.name();
+
+        return new Library(dataverseName, libraryName, language, pendingOp);
     }
 
     @Override
@@ -90,6 +106,8 @@
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(MetadataRecordTypes.LIBRARY_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
 
+        writeOpenFields(library);
+
         // write record
         recordBuilder.write(tupleBuilder.getDataOutput(), true);
         tupleBuilder.addFieldEndOffset();
@@ -97,4 +115,34 @@
         tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
         return tuple;
     }
+
+    protected void writeOpenFields(Library library) throws HyracksDataException {
+        writeLanguage(library);
+        writePendingOp(library);
+    }
+
+    private void writeLanguage(Library library) throws HyracksDataException {
+        String language = library.getLanguage();
+
+        fieldName.reset();
+        aString.setValue(FIELD_NAME_LANGUAGE);
+        stringSerde.serialize(aString, fieldName.getDataOutput());
+        fieldValue.reset();
+        aString.setValue(language);
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(fieldName, fieldValue);
+    }
+
+    private void writePendingOp(Library library) throws HyracksDataException {
+        int pendingOp = library.getPendingOp();
+
+        if (pendingOp != MetadataUtil.PENDING_NO_OP) {
+            fieldName.reset();
+            aString.setValue(FIELD_NAME_PENDING_OP);
+            stringSerde.serialize(aString, fieldName.getDataOutput());
+            fieldValue.reset();
+            int32Serde.serialize(new AInt32(pendingOp), fieldValue.getDataOutput());
+            recordBuilder.addField(fieldName, fieldValue);
+        }
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 24301e0..466778c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -32,12 +32,11 @@
 import org.apache.asterix.common.external.IDataSourceAdapter;
 import org.apache.asterix.common.external.IDataSourceAdapter.AdapterType;
 import org.apache.asterix.common.functions.ExternalFunctionLanguage;
-import org.apache.asterix.common.library.ILibrary;
 import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.adapter.factory.ExternalAdapterFactory;
 import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.feed.api.IFeed;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.library.JavaLibrary;
 import org.apache.asterix.external.provider.AdapterFactoryProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
@@ -50,6 +49,7 @@
 import org.apache.asterix.metadata.entities.Datatype;
 import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
+import org.apache.asterix.metadata.entities.Library;
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
@@ -115,7 +115,7 @@
             // Get adapter from metadata dataset <Metadata dataverse>
             String adapterName = configuration.get(ExternalDataConstants.KEY_ADAPTER_NAME);
             if (adapterName == null) {
-                throw new AlgebricksException("cannot find adatper name");
+                throw new AlgebricksException("cannot find adapter name");
             }
             DatasourceAdapter adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx,
                     MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
@@ -133,14 +133,7 @@
                         adapterFactory = (ITypedAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
                         break;
                     case EXTERNAL:
-                        String[] anameComponents = adapterName.split("#");
-                        String libraryName = anameComponents[0];
-                        ILibrary lib = appCtx.getLibraryManager().getLibrary(feed.getDataverseName(), libraryName);
-                        if (lib.getLanguage() != ExternalFunctionLanguage.JAVA) {
-                            throw new HyracksDataException("Unexpected library language: " + lib.getLanguage());
-                        }
-                        ClassLoader cl = ((JavaLibrary) lib).getClassLoader();
-                        adapterFactory = (ITypedAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
+                        adapterFactory = createExternalAdapterFactory(mdTxnCtx, adapterEntity, adapterFactoryClassname);
                         break;
                     default:
                         throw new AsterixException("Unknown Adapter type " + adapterType);
@@ -174,6 +167,22 @@
         }
     }
 
+    private static ITypedAdapterFactory createExternalAdapterFactory(MetadataTransactionContext mdTxnCtx,
+            DatasourceAdapter adapterEntity, String adapterFactoryClassname)
+            throws AlgebricksException, RemoteException, HyracksDataException {
+        //TODO:library dataverse must be explicitly specified in the adapter entity
+        DataverseName libraryDataverse = adapterEntity.getAdapterIdentifier().getDataverseName();
+        String libraryName = adapterEntity.getLibraryName();
+        Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, libraryDataverse, libraryName);
+        if (library == null) {
+            throw new CompilationException(ErrorCode.UNKNOWN_LIBRARY, libraryName);
+        }
+        if (!ExternalFunctionLanguage.JAVA.name().equals(library.getLanguage())) {
+            throw new HyracksDataException("Unexpected library language: " + library.getLanguage());
+        }
+        return new ExternalAdapterFactory(libraryDataverse, libraryName, adapterFactoryClassname);
+    }
+
     @SuppressWarnings("rawtypes")
     public static Triple<ITypedAdapterFactory, RecordDescriptor, AdapterType> getFeedFactoryAndOutput(Feed feed,
             FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx, ICcApplicationContext appCtx)
@@ -209,14 +218,7 @@
                         adapterFactory = (ITypedAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
                         break;
                     case EXTERNAL:
-                        String[] anameComponents = adapterName.split("#");
-                        String libraryName = anameComponents[0];
-                        ILibrary lib = appCtx.getLibraryManager().getLibrary(feed.getDataverseName(), libraryName);
-                        if (lib.getLanguage() != ExternalFunctionLanguage.JAVA) {
-                            throw new HyracksDataException("Unexpected library language: " + lib.getLanguage());
-                        }
-                        ClassLoader cl = ((JavaLibrary) lib).getClassLoader();
-                        adapterFactory = (ITypedAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
+                        adapterFactory = createExternalAdapterFactory(mdTxnCtx, adapterEntity, adapterFactoryClassname);
                         break;
                     default:
                         throw new AsterixException("Unknown Adapter type " + adapterType);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 6def41a..9099c88 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -46,7 +46,6 @@
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.external.IAdapterFactoryService;
-import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.metadata.IMetadataBootstrap;
 import org.apache.asterix.common.metadata.IMetadataLockUtil;
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
@@ -74,7 +73,6 @@
     private ICCServiceContext ccServiceCtx;
     private IStorageComponentProvider storageComponentProvider;
     private IGlobalRecoveryManager globalRecoveryManager;
-    private ILibraryManager libraryManager;
     private IResourceIdManager resourceIdManager;
     private CompilerProperties compilerProperties;
     private ExternalProperties externalProperties;
@@ -104,15 +102,14 @@
     private final IAdapterFactoryService adapterFactoryService;
 
     public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
-            ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier,
-            IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator ftStrategy,
-            IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider,
-            IMetadataLockManager mdLockManager, IMetadataLockUtil mdLockUtil, IReceptionistFactory receptionistFactory,
+            Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
+            INcLifecycleCoordinator ftStrategy, IJobLifecycleListener activeLifeCycleListener,
+            IStorageComponentProvider storageComponentProvider, IMetadataLockManager mdLockManager,
+            IMetadataLockUtil mdLockUtil, IReceptionistFactory receptionistFactory,
             IConfigValidatorFactory configValidatorFactory, Object extensionManager,
             IAdapterFactoryService adapterFactoryService) throws AlgebricksException, IOException {
         this.ccServiceCtx = ccServiceCtx;
         this.hcc = hcc;
-        this.libraryManager = libraryManager;
         this.activeLifeCycleListener = activeLifeCycleListener;
         this.extensionManager = extensionManager;
         // Determine whether to use old-style asterix-configuration.xml or new-style configuration.
@@ -219,11 +216,6 @@
     }
 
     @Override
-    public ILibraryManager getLibraryManager() {
-        return libraryManager;
-    }
-
-    @Override
     public Object getExtensionManager() {
         return extensionManager;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index e813141..0ee3658 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -386,12 +386,10 @@
         private static final long serialVersionUID = 1L;
         private final List<URL> binaryURLs;
         private final DeploymentId deploymentId;
-        private final boolean extractFromArchive;
 
-        public CliDeployBinaryFunction(List<URL> binaryURLs, DeploymentId deploymentId, boolean isExtractFromArchive) {
+        public CliDeployBinaryFunction(List<URL> binaryURLs, DeploymentId deploymentId) {
             this.binaryURLs = binaryURLs;
             this.deploymentId = deploymentId;
-            this.extractFromArchive = isExtractFromArchive;
         }
 
         @Override
@@ -406,10 +404,6 @@
         public DeploymentId getDeploymentId() {
             return deploymentId;
         }
-
-        public boolean isExtractFromArchive() {
-            return extractFromArchive;
-        }
     }
 
     public static class CliUnDeployBinaryFunction extends Function {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index f295119..1118a68 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -92,8 +92,6 @@
      *
      * @param jobSpec
      *            Job Specification
-     * @param jobFlags
-     *            Flags
      * @throws Exception
      */
     DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception;
@@ -168,13 +166,12 @@
 
     /**
      * Deploy files to the cluster
-     *
-     * @param files
-     *            a list of file paths
      * @param deploymentId
      *            the id used to uniquely identify this set of files for management
+     * @param files
+     *            a list of file paths
      */
-    void deployBinary(DeploymentId deploymentId, List<String> files, boolean extractFromArchive) throws Exception;
+    void deployBinary(DeploymentId deploymentId, List<String> files) throws Exception;
 
     /**
      * undeploy a certain deployment
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 053276f..4cc47d2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -57,8 +57,7 @@
 
     public ClusterTopology getClusterTopology() throws Exception;
 
-    public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId, boolean extractFromArchive)
-            throws Exception;
+    public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception;
 
     public void unDeployBinary(DeploymentId deploymentId) throws Exception;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index 962f826..75f4848 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -20,6 +20,7 @@
 
 import java.io.Closeable;
 import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -57,8 +58,12 @@
 
     public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException;
 
+    public void truncate(IFileHandle fileHandle, long size) throws HyracksDataException;
+
     public long getSize(IFileHandle fileHandle);
 
+    public WritableByteChannel newWritableChannel(IFileHandle fileHandle);
+
     public void deleteWorkspaceFiles() throws HyracksDataException;
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
index 09ecb15..6ad53ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
@@ -21,9 +21,11 @@
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -132,4 +134,22 @@
         return files;
     }
 
+    public static void flushDirectory(File directory) throws IOException {
+        flushDirectory(directory.toPath());
+    }
+
+    public static void flushDirectory(Path path) throws IOException {
+        if (!Files.isDirectory(path)) {
+            throw new IOException("Not a directory: " + path);
+        }
+        if (Files.getFileStore(path).supportsFileAttributeView("posix")) {
+            try (FileChannel ch = FileChannel.open(path, StandardOpenOption.READ)) {
+                ch.force(true);
+            }
+        } else {
+            if (LOGGER.isTraceEnabled()) {
+                LOGGER.trace("Unable to flush directory " + path);
+            }
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 25474769..e6973dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -165,7 +165,7 @@
                 HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
                         (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
                 ccs.getWorkQueue().schedule(new CliDeployBinaryWork(ccs, dbf.getBinaryURLs(), dbf.getDeploymentId(),
-                        dbf.isExtractFromArchive(), new IPCResponder<>(handle, mid)));
+                        new IPCResponder<>(handle, mid)));
                 break;
             case CLI_UNDEPLOY_BINARY:
                 HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf =
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/ApplicationInstallationHandler.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/ApplicationInstallationHandler.java
index f400978..18a2817 100755
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/ApplicationInstallationHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/ApplicationInstallationHandler.java
@@ -28,6 +28,7 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.context.ServerContext;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
@@ -68,8 +69,9 @@
         final String fileName = params[1];
         final String rootDir = ccs.getServerContext().getBaseDir().toString();
 
-        final String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + "applications/" + deployIdString
-                : rootDir + File.separator + "/applications/" + File.separator + deployIdString;
+        final String deploymentDir = rootDir.endsWith(File.separator)
+                ? rootDir + ServerContext.APP_DIR_NAME + File.separator + deployIdString
+                : rootDir + File.separator + ServerContext.APP_DIR_NAME + File.separator + deployIdString;
         final HttpMethod method = request.getHttpRequest().method();
         try {
             response.setStatus(HttpResponseStatus.OK);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
index 53f9360..4962607 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
@@ -47,15 +47,13 @@
     private List<URL> binaryURLs;
     private DeploymentId deploymentId;
     private IPCResponder<DeploymentId> callback;
-    private boolean extractFromArchive;
 
     public CliDeployBinaryWork(ClusterControllerService ncs, List<URL> binaryURLs, DeploymentId deploymentId,
-            boolean extractFromArchive, IPCResponder<DeploymentId> callback) {
+            IPCResponder<DeploymentId> callback) {
         this.ccs = ncs;
         this.binaryURLs = binaryURLs;
         this.deploymentId = deploymentId;
         this.callback = callback;
-        this.extractFromArchive = extractFromArchive;
     }
 
     @Override
@@ -68,7 +66,7 @@
              * Deploy for the cluster controller
              */
             DeploymentUtils.deploy(deploymentId, binaryURLs, ccs.getContext().getJobSerializerDeserializerContainer(),
-                    ccs.getServerContext(), false, extractFromArchive);
+                    ccs.getServerContext(), false);
 
             /**
              * Deploy for the node controllers
@@ -84,7 +82,7 @@
              * deploy binaries to each node controller
              */
             for (NodeControllerState ncs : nodeManager.getAllNodeControllerStates()) {
-                ncs.getNodeController().deployBinary(deploymentId, binaryURLs, extractFromArchive);
+                ncs.getNodeController().deployBinary(deploymentId, binaryURLs);
             }
 
             ccs.getExecutor().execute(new Runnable() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 7f172b6..42a0d66 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -51,7 +51,7 @@
 
     void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception;
 
-    void deployBinary(DeploymentId deploymentId, List<URL> url, boolean extractFromArchive) throws Exception;
+    void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception;
 
     void undeployBinary(DeploymentId deploymentId) throws Exception;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
index ef2777d..e1c4d59 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
@@ -24,6 +24,8 @@
 
 public class ServerContext implements IServerContext {
 
+    public static final String APP_DIR_NAME = "applications";
+
     private final ServerType type;
     private final File baseDir;
     private final File appDir;
@@ -31,7 +33,7 @@
     public ServerContext(ServerType type, File baseDir) {
         this.type = type;
         this.baseDir = baseDir;
-        this.appDir = new File(baseDir, "applications");
+        this.appDir = new File(baseDir, APP_DIR_NAME);
     }
 
     public ServerType getServerType() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java
index e0150f7..ceb0da8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java
@@ -22,20 +22,12 @@
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URL;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Enumeration;
 import java.util.List;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
@@ -48,7 +40,6 @@
 import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.common.context.ServerContext;
-import org.apache.hyracks.util.file.FileUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -61,11 +52,6 @@
 
     public static final String DEPLOYMENT = "applications";
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final String SHIV_SUFFIX = ".pyz";
-    private static final String ZIP_SUFFIX = ".zip";
-    private static final String SHIV_ROOT = "SHIV_ROOT";
-    private static final String SHIV_ENTRY_POINT = "SHIV_ENTRY_POINT";
-    private static final String SHIV_INTERPRETER = "SHIV_INTERPRETER";
 
     /**
      * undeploy an existing deployment
@@ -102,12 +88,13 @@
      * @param container
      *            the container of serailizer/deserializer
      * @param ctx
-     *            the ServerContext * @param isNC
+     *            the ServerContext
+     * @param isNC
      *            true is NC/false is CC
      * @throws HyracksException
      */
     public static void deploy(DeploymentId deploymentId, List<URL> urls, IJobSerializerDeserializerContainer container,
-            ServerContext ctx, boolean isNC, boolean extractFromArchive) throws HyracksException {
+            ServerContext ctx, boolean isNC) throws HyracksException {
         IJobSerializerDeserializer jobSerDe = container.getJobSerializerDeserializer(deploymentId);
         if (jobSerDe == null) {
             jobSerDe = new ClassLoaderJobSerializerDeserializer();
@@ -116,11 +103,7 @@
         String rootDir = ctx.getBaseDir().toString();
         String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + DEPLOYMENT + File.separator + deploymentId
                 : rootDir + File.separator + DEPLOYMENT + File.separator + deploymentId;
-        if (extractFromArchive) {
-            downloadURLs(urls, deploymentDir, isNC, true);
-        } else {
-            jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir, isNC, false));
-        }
+        jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir, isNC));
     }
 
     /**
@@ -195,8 +178,7 @@
      * @return a list of local file URLs
      * @throws HyracksException
      */
-    private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC,
-            boolean extractFromArchive) throws HyracksException {
+    private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC) throws HyracksException {
         //retry 10 times at maximum for downloading binaries
         int retryCount = 10;
         int tried = 0;
@@ -227,13 +209,6 @@
                             os.close();
                         }
                     }
-                    if (extractFromArchive) {
-                        if (targetFile.getAbsolutePath().endsWith(SHIV_SUFFIX)) {
-                            shiv(targetFile.getAbsolutePath(), deploymentDir);
-                        } else if (targetFile.getAbsolutePath().endsWith(ZIP_SUFFIX)) {
-                            unzip(targetFile.getAbsolutePath(), deploymentDir);
-                        }
-                    }
                     downloadedFileURLs.add(targetFile.toURI().toURL());
                 }
                 return downloadedFileURLs;
@@ -244,61 +219,4 @@
         }
         throw HyracksException.create(trace);
     }
-
-    public static void unzip(String sourceFile, String outputDirName) throws IOException {
-        try (ZipFile zipFile = new ZipFile(sourceFile)) {
-            Path outputPath = Paths.get(FilenameUtils.normalize(outputDirName));
-            File outputDir = outputPath.toFile();
-            if (!outputDir.exists()) {
-                throw new IOException("Output path doesn't exist");
-            }
-            if (!outputDir.isDirectory()) {
-                throw new IOException("Output path is not a directory");
-            }
-            Enumeration<? extends ZipEntry> entries = zipFile.entries();
-            List<File> createdFiles = new ArrayList<>();
-            while (entries.hasMoreElements()) {
-                ZipEntry entry = entries.nextElement();
-                String normalizedPath = FilenameUtils.normalize(FileUtil.joinPath(outputDirName, entry.getName()));
-                Path candidatePath = Paths.get(normalizedPath);
-                if (!candidatePath.startsWith(outputPath)) {
-                    throw new IOException("Malformed ZIP archive");
-                }
-                File entryDestination = new File(outputDir, entry.getName());
-                if (!entry.isDirectory()) {
-                    entryDestination.getParentFile().mkdirs();
-                    try (InputStream in = zipFile.getInputStream(entry);
-                            OutputStream out = new FileOutputStream(entryDestination)) {
-                        createdFiles.add(entryDestination);
-                        IOUtils.copy(in, out);
-                    } catch (IOException e) {
-                        for (File f : createdFiles) {
-                            if (!f.delete()) {
-                                LOGGER.error("Couldn't clean up file after failed archive extraction: "
-                                        + f.getAbsolutePath());
-                            }
-                        }
-                        throw e;
-                    }
-                }
-            }
-        }
-    }
-
-    public static void shiv(String sourceFile, String outputDir) throws IOException {
-        loadShim(outputDir, "pyro4.pyz");
-        unzip(sourceFile, outputDir);
-        unzip(outputDir + File.separator + "pyro4.pyz", outputDir);
-        loadShim(outputDir, "entrypoint.py");
-    }
-
-    public static void loadShim(String outputDir, String name) throws IOException {
-        try (InputStream is = DeploymentUtils.class.getClassLoader().getResourceAsStream(name)) {
-            if (is != null) {
-                IOUtils.copyLarge(is, new FileOutputStream(new File(outputDir + File.separator + name)));
-            } else {
-                throw new IOException("Classpath does not contain necessary Python resources!");
-            }
-        }
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 43b9003..2ba4768 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -1176,14 +1176,11 @@
 
         private final List<URL> binaryURLs;
         private final DeploymentId deploymentId;
-        private final boolean extractFromArchive;
 
-        public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId,
-                boolean isExtractFromArchive) {
+        public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId) {
             super(ccId);
             this.binaryURLs = binaryURLs;
             this.deploymentId = deploymentId;
-            this.extractFromArchive = isExtractFromArchive;
         }
 
         @Override
@@ -1198,10 +1195,6 @@
         public DeploymentId getDeploymentId() {
             return deploymentId;
         }
-
-        public boolean isExtractFromArchive() {
-            return extractFromArchive;
-        }
     }
 
     public static class UnDeployBinaryFunction extends CCIdentifiedFunction {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index ea71bc9..d32ee32 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -92,9 +92,8 @@
     }
 
     @Override
-    public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs, boolean extractFromArchive)
-            throws Exception {
-        DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs, ccId, extractFromArchive);
+    public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) throws Exception {
+        DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs, ccId);
         ipcHandle.send(-1, rpaf, null);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index df08c04..8959053 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -102,8 +102,8 @@
 
             case DEPLOY_BINARY:
                 CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn;
-                ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs(),
-                        dbf.getCcId(), dbf.isExtractFromArchive()));
+                ncs.getWorkQueue()
+                        .schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs(), dbf.getCcId()));
                 return;
 
             case UNDEPLOY_BINARY:
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index bab729f..1356f4c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -216,8 +216,8 @@
             jobletMap = new ConcurrentHashMap<>();
             deployedJobSpecActivityClusterGraphMap = new Hashtable<>();
             timer = new Timer(true);
-            serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
-                    new File(ioManager.getWorkspacePath(0), id));
+            File ncBaseDir = ioManager.getWorkspacePath(0);
+            serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, ncBaseDir);
             getNodeControllerInfosAcceptor = new MutableObject<>();
             memoryManager =
                     new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
index 9054377..57bda8b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
@@ -75,7 +75,7 @@
         ensureOpen();
     }
 
-    public void close() throws IOException {
+    public synchronized void close() throws IOException {
         if (raf == null) {
             return;
         }
@@ -101,4 +101,8 @@
             }
         }
     }
+
+    public synchronized boolean isOpen() {
+        return raf != null && raf.getChannel().isOpen();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 6c35eff..da78a0d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -25,6 +25,7 @@
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -365,6 +366,15 @@
     }
 
     @Override
+    public void truncate(IFileHandle fileHandle, long size) throws HyracksDataException {
+        try {
+            ((FileHandle) fileHandle).getFileChannel().truncate(size);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
     public long getSize(IFileHandle fileHandle) {
         return fileHandle.getFileReference().getFile().length();
     }
@@ -435,4 +445,32 @@
         }
         return totalSize;
     }
+
+    @Override
+    public WritableByteChannel newWritableChannel(IFileHandle fHandle) {
+        FileHandle fh = (FileHandle) fHandle;
+        if (!fh.isOpen()) {
+            throw new IllegalStateException("closed");
+        }
+        return new WritableByteChannel() {
+            long position;
+
+            @Override
+            public int write(ByteBuffer src) throws IOException {
+                int written = IOManager.this.syncWrite(fHandle, position, src);
+                position += written;
+                return written;
+            }
+
+            @Override
+            public boolean isOpen() {
+                return fh.isOpen();
+            }
+
+            @Override
+            public void close() throws IOException {
+                IOManager.this.close(fh);
+            }
+        };
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
index 586b539..e645d8e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
@@ -38,19 +38,16 @@
  */
 public class DeployBinaryWork extends AbstractWork {
 
-    private DeploymentId deploymentId;
-    private NodeControllerService ncs;
-    private List<URL> binaryURLs;
+    private final DeploymentId deploymentId;
+    private final NodeControllerService ncs;
+    private final List<URL> binaryURLs;
     private final CcId ccId;
-    private final boolean extractFromArchive;
 
-    public DeployBinaryWork(NodeControllerService ncs, DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId,
-            boolean extractFromArchive) {
+    public DeployBinaryWork(NodeControllerService ncs, DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId) {
         this.deploymentId = deploymentId;
         this.ncs = ncs;
         this.binaryURLs = binaryURLs;
         this.ccId = ccId;
-        this.extractFromArchive = extractFromArchive;
     }
 
     @Override
@@ -58,7 +55,7 @@
         DeploymentStatus status;
         try {
             DeploymentUtils.deploy(deploymentId, binaryURLs, ncs.getContext().getJobSerializerDeserializerContainer(),
-                    ncs.getServerContext(), true, extractFromArchive);
+                    ncs.getServerContext(), true);
             status = DeploymentStatus.SUCCEED;
         } catch (Exception e) {
             status = DeploymentStatus.FAIL;
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java
index 186fb0e..515b95c 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.http.api;
 
+import java.io.IOException;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hyracks.http.server.HttpServer;
@@ -55,4 +56,10 @@
     default IChannelClosedHandler getChannelClosedHandler(HttpServer server) {
         return server.getChannelClosedHandler();
     }
+
+    /**
+     * Called at server startup to initialize the servlet
+     */
+    default void init() throws IOException {
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
index 3977207..514a7dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
@@ -53,7 +53,7 @@
 
     protected final String[] paths;
     protected final ConcurrentMap<String, Object> ctx;
-    private final int[] trims;
+    protected final int[] trims;
 
     public AbstractServlet(ConcurrentMap<String, Object> ctx, String... paths) {
         this.paths = paths;
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 4cae09f..93762a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.http.server;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -224,7 +225,10 @@
         return servlets.getServlets();
     }
 
-    protected void doStart() throws InterruptedException {
+    protected void doStart() throws InterruptedException, IOException {
+        for (IServlet servlet : servlets.getServlets()) {
+            servlet.init();
+        }
         channel = bind();
     }
 
@@ -372,4 +376,8 @@
     public HttpServerConfig getConfig() {
         return config;
     }
+
+    public InetSocketAddress getAddress() {
+        return address;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
index 840fa24..82d7a3b 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
@@ -144,11 +144,9 @@
     }
 
     @Override
-    public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId, boolean extractFromArchive)
-            throws Exception {
+    public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception {
         HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
-                new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(binaryURLs, deploymentId,
-                        extractFromArchive);
+                new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(binaryURLs, deploymentId);
         rpci.call(ipcHandle, dbf);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
index fa6f2f0..9351348 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
@@ -217,13 +217,12 @@
     public DeploymentId deployBinary(List<String> files) throws Exception {
         /** generate a deployment id */
         DeploymentId deploymentId = new DeploymentId(UUID.randomUUID().toString());
-        deployBinary(deploymentId, files, false);
+        deployBinary(deploymentId, files);
         return deploymentId;
     }
 
     @Override
-    public void deployBinary(DeploymentId deploymentId, List<String> files, boolean extractFromArchive)
-            throws Exception {
+    public void deployBinary(DeploymentId deploymentId, List<String> files) throws Exception {
         List<URL> binaryURLs = new ArrayList<>();
         if (files != null && !files.isEmpty()) {
             CloseableHttpClient hc = new DefaultHttpClient();
@@ -250,7 +249,7 @@
             }
         }
         /** deploy the URLs to the CC and NCs */
-        hci.deployBinary(binaryURLs, deploymentId, extractFromArchive);
+        hci.deployBinary(binaryURLs, deploymentId);
     }
 
     @Override