[NO ISSUE][COMP][RT] New library deployment model
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Deploy external libraries using Hyracks jobs
- Library manager is no longer available on CC
- Add IServlet.init() which is invoked during web server startup
- External libraries can no longer provide implementations of
IExternalDataSourceFactory because external libraries are
not available on CC
- Added testcase for an external adapter
Change-Id: If64f99f6a15b81b1e426239bde63360f5ef57059
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6863
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
index 4ca3275..8c35ea3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
@@ -32,7 +32,6 @@
import org.apache.asterix.common.exceptions.NoOpWarningCollector;
import org.apache.asterix.common.exceptions.WarningCollector;
import org.apache.asterix.common.exceptions.WarningUtil;
-import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.dataflow.data.common.ExpressionTypeComputer;
import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
import org.apache.asterix.formats.nontagged.ADMPrinterFactoryProvider;
@@ -364,10 +363,9 @@
}
private boolean canConstantFold(ScalarFunctionCallExpression function) throws AlgebricksException {
- // skip external functions that are not implemented in Java
+ // skip external functions because they're not available at compile time (on CC)
IFunctionInfo fi = function.getFunctionInfo();
- if (fi instanceof IExternalFunctionInfo
- && !ExternalFunctionLanguage.JAVA.equals(((IExternalFunctionInfo) fi).getLanguage())) {
+ if (fi instanceof IExternalFunctionInfo) {
return false;
}
// skip all functions that would produce records/arrays/multisets (derived types) in their open format
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java
index bac8d66..d8e167f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java
@@ -20,6 +20,7 @@
import static org.apache.asterix.api.http.server.ServletConstants.CREDENTIAL_MAP;
+import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@@ -50,7 +51,6 @@
@Override
public void handle(IServletRequest request, IServletResponse response) {
-
try {
boolean authorized = authorize(request);
if (!authorized) {
@@ -89,7 +89,7 @@
return false;
}
String providedUsername = providedCredentials[0];
- String storedPw = storedCredentials.get(providedUsername);
+ String storedPw = getStoredCredentials(request).get(providedUsername);
if (storedPw == null) {
LOGGER.debug("Invalid username");
return false;
@@ -102,4 +102,18 @@
return false;
}
}
+
+ protected Map<String, String> getStoredCredentials(IServletRequest request) {
+ return storedCredentials;
+ }
+
+ public static String hashPassword(String password) {
+ return BCrypt.hashpw(password, BCrypt.gensalt(12));
+ }
+
+ public static String createAuthHeader(String user, String password) {
+ String auth = user + ":" + password;
+ byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes(StandardCharsets.ISO_8859_1));
+ return "Basic " + new String(encodedAuth);
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
index b0da8e0..d0ae6ac 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
@@ -18,60 +18,71 @@
*/
package org.apache.asterix.api.http.server;
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
import static org.apache.asterix.common.functions.ExternalFunctionLanguage.JAVA;
import static org.apache.asterix.common.functions.ExternalFunctionLanguage.PYTHON;
-import static org.apache.asterix.common.library.LibraryDescriptor.DESCRIPTOR_NAME;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.io.PrintWriter;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
-import java.rmi.RemoteException;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
-import org.apache.asterix.app.external.ExternalLibraryUtils;
-import org.apache.asterix.app.message.LoadUdfMessage;
+import org.apache.asterix.app.result.ResponsePrinter;
+import org.apache.asterix.app.translator.RequestParameters;
import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.api.IReceptionist;
+import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
-import org.apache.asterix.common.library.ILibrary;
-import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.library.LibraryDescriptor;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.asterix.common.metadata.DataverseName;
-import org.apache.asterix.common.metadata.IMetadataLockUtil;
-import org.apache.asterix.common.metadata.LockList;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.CreateLibraryStatement;
+import org.apache.asterix.lang.common.statement.LibraryDropStatement;
import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.DatasourceAdapter;
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.Function;
-import org.apache.asterix.metadata.entities.Library;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.asterix.translator.IRequestParameters;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.NullWriter;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.deployment.DeploymentId;
-import org.apache.hyracks.api.io.IPersistedResourceRegistry;
-import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.hyracks.api.exceptions.IFormattedException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.context.ServerContext;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.utils.HttpUtil;
import org.apache.hyracks.util.file.FileUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import com.google.common.collect.ImmutableMap;
-
-import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.codec.http.HttpScheme;
import io.netty.handler.codec.http.multipart.FileUpload;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
@@ -79,27 +90,68 @@
public class UdfApiServlet extends BasicAuthServlet {
private static final Logger LOGGER = LogManager.getLogger();
- private final ICcApplicationContext appCtx;
- private final ICCMessageBroker broker;
- private static final String UDF_TMP_DIR_PREFIX = "udf_temp";
- public static final int UDF_RESPONSE_TIMEOUT = 5000;
- private Map<String, ExternalFunctionLanguage> exensionMap =
- new ImmutableMap.Builder<String, ExternalFunctionLanguage>().put("pyz", PYTHON).put("zip", JAVA).build();
- public UdfApiServlet(ICcApplicationContext appCtx, ConcurrentMap<String, Object> ctx, String... paths) {
+ private final ICcApplicationContext appCtx;
+ private final ClusterControllerService ccs;
+ private final HttpScheme httpServerProtocol;
+ private final int httpServerPort;
+
+ private final ILangCompilationProvider compilationProvider;
+ private final IStatementExecutorFactory statementExecutorFactory;
+ private final IStorageComponentProvider componentProvider;
+ private final IReceptionist receptionist;
+ private final Path workingDir;
+ private Map<String, String> sysCredentials;
+ private String sysAuthHeader;
+
+ public UdfApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx,
+ ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory,
+ IStorageComponentProvider componentProvider, HttpScheme httpServerProtocol, int httpServerPort) {
super(ctx, paths);
this.appCtx = appCtx;
- this.broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ ICCServiceContext srvCtx = appCtx.getServiceContext();
+ this.ccs = (ClusterControllerService) srvCtx.getControllerService();
+ this.compilationProvider = compilationProvider;
+ this.statementExecutorFactory = statementExecutorFactory;
+ this.componentProvider = componentProvider;
+ this.receptionist = appCtx.getReceptionist();
+ this.httpServerProtocol = httpServerProtocol;
+ this.httpServerPort = httpServerPort;
+ File baseDir = srvCtx.getServerCtx().getBaseDir();
+ this.workingDir = baseDir.getAbsoluteFile().toPath().normalize().resolve(
+ Paths.get(ServerContext.APP_DIR_NAME, ExternalLibraryManager.LIBRARY_MANAGER_BASE_DIR_NAME, "tmp"));
}
- private Pair<String, DataverseName> getResource(FullHttpRequest req) throws IllegalArgumentException {
- String[] path = new QueryStringDecoder(req.uri()).path().split("/");
- if (path.length != 5) {
- throw new IllegalArgumentException("Invalid resource.");
+ @Override
+ public void init() throws IOException {
+ super.init();
+ initAuth();
+ initStorage();
+ }
+
+ private void initAuth() {
+ // generate internal user
+ String sysUser;
+ do {
+ sysUser = generateRandomString(32);
+ } while (storedCredentials.containsKey(sysUser));
+ String sysPassword = generateRandomString(128);
+ this.sysCredentials = Collections.singletonMap(sysUser, hashPassword(sysPassword));
+ this.sysAuthHeader = createAuthHeader(sysUser, sysPassword);
+ }
+
+ private void initStorage() throws IOException {
+ // prepare working directory
+ if (Files.isDirectory(workingDir)) {
+ try {
+ FileUtils.cleanDirectory(workingDir.toFile());
+ } catch (IOException e) {
+ LOGGER.warn("Could not clean directory: " + workingDir, e);
+ }
+ } else {
+ Files.deleteIfExists(workingDir);
+ FileUtil.forceMkdirs(workingDir.toFile());
}
- String resourceName = path[path.length - 1];
- DataverseName dataverseName = DataverseName.createFromCanonicalForm(path[path.length - 2]); // TODO: use path separators instead for multiparts
- return new Pair<>(resourceName, dataverseName);
}
@Override
@@ -109,161 +161,67 @@
response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
return;
}
-
- PrintWriter responseWriter = response.writer();
- FullHttpRequest req = request.getHttpRequest();
- Pair<String, DataverseName> resourceNames;
- try {
- resourceNames = getResource(req);
- } catch (IllegalArgumentException e) {
+ HttpRequest httpRequest = request.getHttpRequest();
+ Pair<DataverseName, String> libraryName = parseLibraryName(request);
+ if (libraryName == null) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return;
}
- String resourceName = resourceNames.first;
- DataverseName dataverse = resourceNames.second;
- HttpPostRequestDecoder multipartDec = new HttpPostRequestDecoder(req);
- File udfFile = null;
- IMetadataLockUtil mdLockUtil = appCtx.getMetadataLockUtil();
- MetadataTransactionContext mdTxnCtx = null;
- LockList mdLockList = null;
+ Path libraryTempFile = null;
+ HttpPostRequestDecoder requestDecoder = new HttpPostRequestDecoder(httpRequest);
try {
- if (!multipartDec.hasNext() || multipartDec.getBodyHttpDatas().size() != 1) {
+ if (!requestDecoder.hasNext() || requestDecoder.getBodyHttpDatas().size() != 1) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return;
}
- InterfaceHttpData f = multipartDec.getBodyHttpDatas().get(0);
- if (!f.getHttpDataType().equals(InterfaceHttpData.HttpDataType.FileUpload)) {
+ InterfaceHttpData httpData = requestDecoder.getBodyHttpDatas().get(0);
+ if (!httpData.getHttpDataType().equals(InterfaceHttpData.HttpDataType.FileUpload)) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return;
}
- MetadataManager.INSTANCE.init();
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- MetadataProvider metadataProvider = MetadataProvider.create(appCtx, null);
- mdLockList = metadataProvider.getLocks();
- mdLockUtil.createLibraryBegin(appCtx.getMetadataLockManager(), metadataProvider.getLocks(), dataverse,
- resourceName);
- File workingDir = new File(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(),
- UDF_TMP_DIR_PREFIX);
- if (!workingDir.exists()) {
- FileUtil.forceMkdirs(workingDir);
- }
- FileUpload udf = (FileUpload) f;
- String[] fileNameParts = udf.getFilename().split("\\.");
- String suffix = fileNameParts[fileNameParts.length - 1];
- ExternalFunctionLanguage libLang = exensionMap.get(suffix);
- if (libLang == null) {
+ FileUpload fileUpload = (FileUpload) httpData;
+ String fileExt = FilenameUtils.getExtension(fileUpload.getFilename());
+ ExternalFunctionLanguage language = getLanguageByFileExtension(fileExt);
+ if (language == null) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return;
}
- LibraryDescriptor desc = new LibraryDescriptor(libLang);
- udfFile = File.createTempFile(resourceName, "." + suffix, workingDir);
- udf.renameTo(udfFile);
- setupBinariesAndClassloaders(dataverse, resourceName, udfFile, desc);
- installLibrary(mdTxnCtx, dataverse, resourceName);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- response.setStatus(HttpResponseStatus.OK);
- } catch (Exception e) {
try {
- ExternalLibraryUtils.deleteDeployedUdf(broker, appCtx, dataverse, resourceName);
- } catch (Exception e2) {
- e.addSuppressed(e2);
+ IRequestReference requestReference = receptionist.welcome(request);
+ libraryTempFile = Files.createTempFile(workingDir, "lib_", '.' + fileExt);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Created temporary file " + libraryTempFile + " for library " + libraryName.first + "."
+ + libraryName.second);
+ }
+ fileUpload.renameTo(libraryTempFile.toFile());
+ URI downloadURI = createDownloadURI(libraryTempFile);
+ CreateLibraryStatement stmt = new CreateLibraryStatement(libraryName.first, libraryName.second,
+ language, downloadURI, true, sysAuthHeader);
+ executeStatement(stmt, requestReference);
+ response.setStatus(HttpResponseStatus.OK);
+ } catch (Exception e) {
+ response.setStatus(toHttpErrorStatus(e));
+ PrintWriter responseWriter = response.writer();
+ responseWriter.write(e.getMessage());
+ responseWriter.flush();
+ LOGGER.error("Error creating/updating library " + libraryName.first + "." + libraryName.second, e);
}
- if (mdTxnCtx != null) {
+ } finally {
+ requestDecoder.destroy();
+ if (libraryTempFile != null) {
try {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
- } catch (RemoteException r) {
- LOGGER.error("Unable to abort metadata transaction", r);
+ Files.deleteIfExists(libraryTempFile);
+ } catch (IOException e) {
+ LOGGER.warn("Could not delete temporary file " + libraryTempFile, e);
}
}
- response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
- responseWriter.write(e.getMessage());
- responseWriter.flush();
- LOGGER.error(e);
- } finally {
- multipartDec.destroy();
- if (udfFile != null) {
- udfFile.delete();
- }
- if (mdLockList != null) {
- mdLockList.unlock();
- }
}
}
- private File writeDescriptor(File folder, LibraryDescriptor desc) throws IOException {
- IPersistedResourceRegistry reg = appCtx.getServiceContext().getPersistedResourceRegistry();
- byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(desc.toJson(reg));
- File descFile = new File(folder, DESCRIPTOR_NAME);
- FileUtil.writeAndForce(Paths.get(descFile.getAbsolutePath()), bytes);
- return descFile;
- }
-
- private void setupBinariesAndClassloaders(DataverseName dataverse, String resourceName, File udfFile,
- LibraryDescriptor desc) throws Exception {
- IHyracksClientConnection hcc = appCtx.getHcc();
- ILibraryManager libMgr = appCtx.getLibraryManager();
- DeploymentId udfName = new DeploymentId(ExternalLibraryUtils.makeDeploymentId(dataverse, resourceName));
- ILibrary lib = libMgr.getLibrary(dataverse, resourceName);
- if (lib != null) {
- deleteUdf(dataverse, resourceName);
- }
- File descriptor = writeDescriptor(udfFile.getParentFile(), desc);
- hcc.deployBinary(udfName, Arrays.asList(udfFile.getAbsolutePath(), descriptor.getAbsolutePath()), true);
- String deployedPath =
- FileUtil.joinPath(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(),
- DeploymentUtils.DEPLOYMENT, udfName.toString());
- if (!descriptor.delete()) {
- throw new IOException("Could not remove already uploaded library descriptor");
- }
- libMgr.setUpDeployedLibrary(deployedPath);
- long reqId = broker.newRequestId();
- List<INcAddressedMessage> requests = new ArrayList<>();
- List<String> ncs = new ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes());
- ncs.forEach(s -> requests.add(new LoadUdfMessage(dataverse, resourceName, reqId)));
- broker.sendSyncRequestToNCs(reqId, ncs, requests, UDF_RESPONSE_TIMEOUT);
- }
-
- private static void installLibrary(MetadataTransactionContext mdTxnCtx, DataverseName dataverse, String libraryName)
- throws RemoteException, AlgebricksException {
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
- if (dv == null) {
- throw new AsterixException(ErrorCode.UNKNOWN_DATAVERSE);
- }
- Library libraryInMetadata = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, dataverse, libraryName);
- if (libraryInMetadata != null) {
- //replacing binary, library already exists
- return;
- }
- // Add library
- MetadataManager.INSTANCE.addLibrary(mdTxnCtx, new Library(dataverse, libraryName));
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Added library " + libraryName + " to Metadata");
- }
- }
-
- private static void deleteLibrary(MetadataTransactionContext mdTxnCtx, DataverseName dataverse, String libraryName)
- throws RemoteException, AlgebricksException {
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
- if (dv == null) {
- throw new AsterixException(ErrorCode.UNKNOWN_DATAVERSE, dataverse);
- }
- Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, dataverse, libraryName);
- if (library == null) {
- throw new AsterixException(ErrorCode.UNKNOWN_LIBRARY, libraryName);
- }
- List<Function> functions = MetadataManager.INSTANCE.getDataverseFunctions(mdTxnCtx, dataverse);
- for (Function function : functions) {
- if (libraryName.equals(function.getLibrary())) {
- throw new AsterixException(ErrorCode.METADATA_DROP_LIBRARY_IN_USE, libraryName);
- }
- }
- List<DatasourceAdapter> adapters = MetadataManager.INSTANCE.getDataverseAdapters(mdTxnCtx, dataverse);
- for (DatasourceAdapter adapter : adapters) {
- if (libraryName.equals(adapter.getLibrary())) {
- throw new AsterixException(ErrorCode.METADATA_DROP_LIBRARY_IN_USE, libraryName);
- }
- }
- MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverse, libraryName);
+ private URI createDownloadURI(Path file) throws Exception {
+ String path = paths[0].substring(0, trims[0]) + '/' + file.getFileName();
+ String host = getHyracksClientConnection().getHost();
+ return new URI(httpServerProtocol.toString(), null, host, httpServerPort, path, null, null);
}
@Override
@@ -273,54 +231,139 @@
response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
return;
}
-
- Pair<String, DataverseName> resourceNames;
- try {
- resourceNames = getResource(request.getHttpRequest());
- } catch (IllegalArgumentException e) {
+ Pair<DataverseName, String> libraryName = parseLibraryName(request);
+ if (libraryName == null) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return;
}
- PrintWriter responseWriter = response.writer();
- String resourceName = resourceNames.first;
- DataverseName dataverse = resourceNames.second;
try {
- deleteUdf(dataverse, resourceName);
+ IRequestReference requestReference = receptionist.welcome(request);
+ LibraryDropStatement stmt = new LibraryDropStatement(libraryName.first, libraryName.second);
+ executeStatement(stmt, requestReference);
+ response.setStatus(HttpResponseStatus.OK);
} catch (Exception e) {
- response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ response.setStatus(toHttpErrorStatus(e));
+ PrintWriter responseWriter = response.writer();
responseWriter.write(e.getMessage());
responseWriter.flush();
- return;
+ LOGGER.error("Error deleting library " + libraryName.first + "." + libraryName.second, e);
}
- response.setStatus(HttpResponseStatus.OK);
}
- private void deleteUdf(DataverseName dataverse, String resourceName) throws Exception {
- IMetadataLockUtil mdLockUtil = appCtx.getMetadataLockUtil();
- MetadataTransactionContext mdTxnCtx = null;
- LockList mdLockList = null;
+ private void executeStatement(Statement statement, IRequestReference requestReference) throws Exception {
+ SessionOutput sessionOutput = new SessionOutput(new SessionConfig(SessionConfig.OutputFormat.ADM),
+ new PrintWriter(NullWriter.NULL_WRITER));
+ ResponsePrinter printer = new ResponsePrinter(sessionOutput);
+ ResultProperties resultProperties = new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE, 1);
+ IRequestParameters requestParams = new RequestParameters(requestReference, "", null, resultProperties,
+ new IStatementExecutor.Stats(), new IStatementExecutor.StatementProperties(), null, null,
+ Collections.emptyMap(), Collections.emptyMap(), false);
+ MetadataManager.INSTANCE.init();
+ IStatementExecutor translator = statementExecutorFactory.create(appCtx, Collections.singletonList(statement),
+ sessionOutput, compilationProvider, componentProvider, printer);
+ translator.compileAndExecute(getHyracksClientConnection(), requestParams);
+ }
+
+ @Override
+ protected void get(IServletRequest request, IServletResponse response) throws Exception {
+ IClusterManagementWork.ClusterState clusterState = appCtx.getClusterStateManager().getState();
+ if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
+ response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
+ return;
+ }
+ String localPath = localPath(request);
+ while (localPath.startsWith("/")) {
+ localPath = localPath.substring(1);
+ }
+ if (localPath.isEmpty()) {
+ response.setStatus(HttpResponseStatus.BAD_REQUEST);
+ return;
+ }
+ Path filePath = workingDir.resolve(localPath).normalize();
+ if (!filePath.startsWith(workingDir)) {
+ response.setStatus(HttpResponseStatus.BAD_REQUEST);
+ return;
+ }
+ readFromFile(filePath, response);
+ }
+
+ private IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR
+ IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+ if (hcc == null) {
+ throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR);
+ }
+ return hcc;
+ }
+
+ @Override
+ protected Map<String, String> getStoredCredentials(IServletRequest request) {
+ return request.getHttpRequest().method().equals(HttpMethod.GET) ? sysCredentials
+ : super.getStoredCredentials(request);
+ }
+
+ private Pair<DataverseName, String> parseLibraryName(IServletRequest request) throws IllegalArgumentException {
+ String[] path = StringUtils.split(localPath(request), '/');
+ int ln = path.length;
+ if (ln < 2) {
+ return null;
+ }
+ String libraryName = path[ln - 1];
+ DataverseName dataverseName = DataverseName.create(Arrays.asList(path), 0, ln - 1);
+ return new Pair<>(dataverseName, libraryName);
+ }
+
+ private static ExternalFunctionLanguage getLanguageByFileExtension(String fileExtension) {
+ switch (fileExtension) {
+ case LibraryDescriptor.FILE_EXT_ZIP:
+ return JAVA;
+ case LibraryDescriptor.FILE_EXT_PYZ:
+ return PYTHON;
+ default:
+ return null;
+ }
+ }
+
+ private HttpResponseStatus toHttpErrorStatus(Exception e) {
+ if (e instanceof IFormattedException) {
+ IFormattedException fe = (IFormattedException) e;
+ if (ErrorCode.ASTERIX.equals(fe.getComponent())) {
+ switch (fe.getErrorCode()) {
+ case ErrorCode.UNKNOWN_DATAVERSE:
+ case ErrorCode.UNKNOWN_LIBRARY:
+ return HttpResponseStatus.NOT_FOUND;
+ }
+ }
+ }
+ return HttpResponseStatus.INTERNAL_SERVER_ERROR;
+ }
+
+ private static String generateRandomString(int size) {
+ return RandomStringUtils.randomAlphanumeric(size);
+ }
+
+ protected void readFromFile(Path filePath, IServletResponse response) throws Exception {
+ class InputStreamGetter extends SynchronizableWork {
+ private InputStream is;
+
+ @Override
+ protected void doRun() throws Exception {
+ is = Files.newInputStream(filePath);
+ }
+ }
+
+ InputStreamGetter r = new InputStreamGetter();
+ ccs.getWorkQueue().scheduleAndSync(r);
+
+ if (r.is == null) {
+ response.setStatus(HttpResponseStatus.NOT_FOUND);
+ return;
+ }
try {
- MetadataManager.INSTANCE.init();
- mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- MetadataProvider metadataProvider = MetadataProvider.create(appCtx, null);
- mdLockList = metadataProvider.getLocks();
- mdLockUtil.dropLibraryBegin(appCtx.getMetadataLockManager(), metadataProvider.getLocks(), dataverse,
- resourceName);
- deleteLibrary(mdTxnCtx, dataverse, resourceName);
- ExternalLibraryUtils.deleteDeployedUdf(broker, appCtx, dataverse, resourceName);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e) {
- try {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
- } catch (RemoteException r) {
- LOGGER.error("Unable to abort metadata transaction", r);
- }
- LOGGER.error(e);
- throw e;
+ response.setStatus(HttpResponseStatus.OK);
+ HttpUtil.setContentType(response, "application/octet-stream");
+ IOUtils.copyLarge(r.is, response.outputStream());
} finally {
- if (mdLockList != null) {
- mdLockList.unlock();
- }
+ r.is.close();
}
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtil.java
new file mode 100644
index 0000000..dd1b736
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtil.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.functions.ExternalFunctionLanguage;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.external.operators.LibraryDeployAbortOperatorDescriptor;
+import org.apache.asterix.external.operators.LibraryDeployCommitOperatorDescriptor;
+import org.apache.asterix.external.operators.LibraryDeployPrepareOperatorDescriptor;
+import org.apache.asterix.external.operators.LibraryUndeployOperatorDescriptor;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class ExternalLibraryUtil {
+
+ private ExternalLibraryUtil() {
+ }
+
+ public static Triple<JobSpecification, JobSpecification, JobSpecification> buildCreateLibraryJobSpec(
+ DataverseName dataverseName, String libraryName, ExternalFunctionLanguage language, URI downloadURI,
+ String authToken, MetadataProvider metadataProvider) {
+
+ ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = getSplitsAndConstraints(appCtx);
+
+ JobSpecification prepareJobSpec = createLibraryPrepareJobSpec(dataverseName, libraryName, language, downloadURI,
+ authToken, appCtx, splitsAndConstraint);
+
+ JobSpecification commitJobSpec =
+ createLibraryCommitJobSpec(dataverseName, libraryName, appCtx, splitsAndConstraint);
+
+ JobSpecification abortJobSpec =
+ createLibraryAbortJobSpec(dataverseName, libraryName, appCtx, splitsAndConstraint);
+
+ return new Triple<>(prepareJobSpec, commitJobSpec, abortJobSpec);
+ }
+
+ private static JobSpecification createLibraryPrepareJobSpec(DataverseName dataverseName, String libraryName,
+ ExternalFunctionLanguage language, URI downloadURI, String authToken, ICcApplicationContext appCtx,
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint) {
+ JobSpecification jobSpec = RuntimeUtils.createJobSpecification(appCtx);
+ IOperatorDescriptor opDesc = new LibraryDeployPrepareOperatorDescriptor(jobSpec, dataverseName, libraryName,
+ language, downloadURI, authToken);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc,
+ splitsAndConstraint.second);
+ jobSpec.addRoot(opDesc);
+ return jobSpec;
+ }
+
+ private static JobSpecification createLibraryCommitJobSpec(DataverseName dataverseName, String libraryName,
+ ICcApplicationContext appCtx, Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint) {
+ JobSpecification jobSpec = RuntimeUtils.createJobSpecification(appCtx);
+ IOperatorDescriptor opDesc = new LibraryDeployCommitOperatorDescriptor(jobSpec, dataverseName, libraryName);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc,
+ splitsAndConstraint.second);
+ return jobSpec;
+ }
+
+ private static JobSpecification createLibraryAbortJobSpec(DataverseName dataverseName, String libraryName,
+ ICcApplicationContext appCtx, Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint) {
+ JobSpecification jobSpec = RuntimeUtils.createJobSpecification(appCtx);
+ IOperatorDescriptor opDesc = new LibraryDeployAbortOperatorDescriptor(jobSpec, dataverseName, libraryName);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc,
+ splitsAndConstraint.second);
+ return jobSpec;
+ }
+
+ public static JobSpecification buildDropLibraryJobSpec(DataverseName dataverseName, String libraryName,
+ MetadataProvider metadataProvider) {
+ ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = getSplitsAndConstraints(appCtx);
+
+ JobSpecification jobSpec = RuntimeUtils.createJobSpecification(appCtx);
+ IOperatorDescriptor opDesc = new LibraryUndeployOperatorDescriptor(jobSpec, dataverseName, libraryName);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc,
+ splitsAndConstraint.second);
+ jobSpec.addRoot(opDesc);
+
+ return jobSpec;
+ }
+
+ private static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitsAndConstraints(
+ ICcApplicationContext appCtx) {
+ FileSplit[] splits = getSplits(appCtx.getClusterStateManager());
+ return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+ }
+
+ private static FileSplit[] getSplits(IClusterStateManager clusterStateManager) {
+ ClusterPartition[] clusterPartitions = clusterStateManager.getClusterPartitons();
+ Arrays.sort(clusterPartitions,
+ Comparator.comparing(ClusterPartition::getNodeId).thenComparingInt(ClusterPartition::getIODeviceNum));
+ List<FileSplit> splits = new ArrayList<>();
+ for (ClusterPartition partition : clusterPartitions) {
+ String activeNodeId = partition.getActiveNodeId();
+ int n = splits.size();
+ if (n > 0 && splits.get(n - 1).getNodeName().equals(activeNodeId)) {
+ continue;
+ }
+ FileSplit split = StoragePathUtil.getFileSplitForClusterPartition(partition, ".");
+ splits.add(split);
+ }
+ return splits.toArray(new FileSplit[0]);
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
deleted file mode 100755
index e5d874d..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.external;
-
-import static org.apache.asterix.api.http.server.UdfApiServlet.UDF_RESPONSE_TIMEOUT;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.asterix.app.message.DeleteUdfMessage;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.asterix.common.metadata.DataverseName;
-import org.apache.hyracks.api.deployment.DeploymentId;
-
-public class ExternalLibraryUtils {
-
- private ExternalLibraryUtils() {
- }
-
- public static void deleteDeployedUdf(ICCMessageBroker broker, ICcApplicationContext appCtx,
- DataverseName dataverseName, String lib) throws Exception {
- long reqId = broker.newRequestId();
- List<INcAddressedMessage> requests = new ArrayList<>();
- List<String> ncs = new ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes());
- ncs.forEach(s -> requests.add(new DeleteUdfMessage(dataverseName, lib, reqId)));
- broker.sendSyncRequestToNCs(reqId, ncs, requests, UDF_RESPONSE_TIMEOUT);
- appCtx.getLibraryManager().deregister(dataverseName, lib);
- appCtx.getHcc().unDeployBinary(new DeploymentId(makeDeploymentId(dataverseName, lib)));
- }
-
- public static String makeDeploymentId(DataverseName dv, String resourceName) {
- List<String> dvParts = dv.getParts();
- dvParts.add(resourceName);
- DataverseName dvWithLibrarySuffix = DataverseName.create(dvParts);
- return dvWithLibrarySuffix.getCanonicalForm();
- }
-}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractUdfMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractUdfMessage.java
deleted file mode 100644
index b8c77e7..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractUdfMessage.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.message;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.messaging.CcIdentifiedMessage;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.asterix.common.metadata.DataverseName;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public abstract class AbstractUdfMessage extends CcIdentifiedMessage implements INcAddressedMessage {
-
- protected final DataverseName dataverseName;
- protected final String libraryName;
- protected static final Logger LOGGER = LogManager.getLogger();
-
- private static final long serialVersionUID = 3L;
-
- private final long reqId;
-
- public AbstractUdfMessage(DataverseName dataverseName, String libraryName, long reqId) {
- this.dataverseName = dataverseName;
- this.libraryName = libraryName;
- this.reqId = reqId;
- }
-
- @Override
- public void handle(INcApplicationContext appCtx) {
- ILibraryManager mgr = appCtx.getLibraryManager();
- INCMessageBroker broker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker();
- try {
- handleAction(mgr, appCtx);
- broker.sendMessageToCC(getCcId(), new UdfResponseMessage(reqId, null));
- } catch (Exception e) {
- try {
- LOGGER.error("Error in UDF distribution", e);
- broker.sendMessageToPrimaryCC(new UdfResponseMessage(reqId, e));
- } catch (Exception f) {
- LOGGER.error("Unable to send failure response to CC", f);
- }
- }
-
- }
-
- protected abstract void handleAction(ILibraryManager mgr, INcApplicationContext appCtx) throws Exception;
-
-}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java
deleted file mode 100644
index 890f1fe..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.message;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.metadata.DataverseName;
-
-public class DeleteUdfMessage extends AbstractUdfMessage {
-
- private static final long serialVersionUID = 3L;
-
- public DeleteUdfMessage(DataverseName dataverseName, String libraryName, long reqId) {
- super(dataverseName, libraryName, reqId);
- }
-
- @Override
- protected void handleAction(ILibraryManager mgr, INcApplicationContext appCtx) {
- mgr.deregister(dataverseName, libraryName);
- }
-}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java
deleted file mode 100644
index f2d9174..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.message;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.metadata.DataverseName;
-import org.apache.hyracks.control.common.deployment.DeploymentUtils;
-import org.apache.hyracks.util.file.FileUtil;
-
-public class LoadUdfMessage extends AbstractUdfMessage {
-
- private static final long serialVersionUID = 2L;
-
- public LoadUdfMessage(DataverseName dataverseName, String libraryName, long reqId) {
- super(dataverseName, libraryName, reqId);
- }
-
- @Override
- protected void handleAction(ILibraryManager mgr, INcApplicationContext appCtx) throws Exception {
- appCtx.getLibraryManager().setUpDeployedLibrary(
- FileUtil.joinPath(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(),
- DeploymentUtils.DEPLOYMENT, dataverseName.getCanonicalForm() + "." + libraryName));
- }
-}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UdfResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UdfResponseMessage.java
deleted file mode 100644
index 29a91c5..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UdfResponseMessage.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.message;
-
-import java.util.ArrayList;
-
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
-import org.apache.asterix.common.messaging.api.INcResponse;
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class UdfResponseMessage implements ICcAddressedMessage, INcResponse {
-
- private static final long serialVersionUID = -4520773141458281271L;
- private final long reqId;
- private final Exception failure;
-
- public UdfResponseMessage(long reqId, Exception failure) {
- this.reqId = reqId;
- this.failure = failure;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void setResult(MutablePair<ICCMessageBroker.ResponseState, Object> result) {
- ICCMessageBroker.ResponseState responseState = result.getLeft();
- if (failure != null) {
- result.setLeft(ICCMessageBroker.ResponseState.FAILURE);
- result.setRight(failure);
- return;
- }
- switch (responseState) {
- case UNINITIALIZED:
- // First to arrive
- result.setRight(new ArrayList<String>());
- // No failure, change state to success
- result.setLeft(ICCMessageBroker.ResponseState.SUCCESS);
- // Fallthrough
- case SUCCESS:
- break;
- default:
- break;
-
- }
- }
-
- @Override
- public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
- broker.respond(reqId, this);
- }
-}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index e9da651..5c15e68 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -80,6 +80,7 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.IPersistedResourceRegistry;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
@@ -142,7 +143,7 @@
private ActiveManager activeManager;
private IReplicationChannel replicationChannel;
private IReplicationManager replicationManager;
- private final ILibraryManager libraryManager;
+ private ExternalLibraryManager libraryManager;
private final NCExtensionManager ncExtensionManager;
private final IStorageComponentProvider componentProvider;
private final IPersistedResourceRegistry persistedResourceRegistry;
@@ -170,8 +171,6 @@
componentProvider = new StorageComponentProvider();
resourceIdFactory = new GlobalResourceIdFactoryProvider(ncServiceContext).createResourceIdFactory();
persistedResourceRegistry = ncServiceContext.getPersistedResourceRegistry();
- libraryManager =
- new ExternalLibraryManager(ncServiceContext.getServerCtx().getAppDir(), persistedResourceRegistry);
cacheManager = new CacheManager();
}
@@ -198,7 +197,8 @@
txnSubsystem = new TransactionSubsystem(this, recoveryManagerFactory);
IRecoveryManager recoveryMgr = txnSubsystem.getRecoveryManager();
SystemState systemState = recoveryMgr.getSystemState();
- if (initialRun || systemState == SystemState.PERMANENT_DATA_LOSS) {
+ boolean resetStorageData = initialRun || systemState == SystemState.PERMANENT_DATA_LOSS;
+ if (resetStorageData) {
//delete any storage data before the resource factory is initialized
if (LOGGER.isWarnEnabled()) {
LOGGER.log(Level.WARN,
@@ -253,6 +253,12 @@
storageProperties.getBufferCacheMaxOpenFiles(), ioQueueLen, getServiceContext().getThreadFactory());
}
+ NodeControllerService ncs = (NodeControllerService) getServiceContext().getControllerService();
+ FileReference appDir =
+ ioManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getAbsolutePath());
+ libraryManager = new ExternalLibraryManager(ncs, persistedResourceRegistry, appDir);
+ libraryManager.initStorage(resetStorageData);
+
/*
* The order of registration is important. The buffer cache must registered before recovery and transaction
* managers. Notes: registered components are stopped in reversed order
@@ -280,6 +286,7 @@
lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
lccm.register(txnSubsystem.getCheckpointManager());
+ lccm.register(libraryManager);
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
deleted file mode 100644
index 1656ce5..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.nc.task;
-
-import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.hyracks.api.control.CcId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.service.IControllerService;
-
-public class ExternalLibrarySetupTask implements INCLifecycleTask {
-
- private static final long serialVersionUID = 1L;
- private final boolean metadataNode;
-
- public ExternalLibrarySetupTask(boolean metadataNode) {
- this.metadataNode = metadataNode;
- }
-
- @Override
- public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
- INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
- try {
- appContext.getLibraryManager().scanLibraries(cs.getContext().getServerCtx().getAppDir());
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
- }
-
- @Override
- public String toString() {
- return "{ \"class\" : \"" + getClass().getSimpleName() + "\", \"metadata-node\" : " + metadataNode + " }";
- }
-}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 0d86cb9..7576e0d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -29,7 +29,6 @@
import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
import org.apache.asterix.app.nc.task.CheckpointTask;
import org.apache.asterix.app.nc.task.ExportMetadataNodeTask;
-import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
import org.apache.asterix.app.nc.task.LocalRecoveryTask;
import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
@@ -202,7 +201,6 @@
if (metadataNode) {
tasks.add(new MetadataBootstrapTask(clusterManager.getMetadataPartition().getPartitionId()));
}
- tasks.add(new ExternalLibrarySetupTask(metadataNode));
tasks.add(new CheckpointTask());
tasks.add(new StartLifecycleComponentsTask());
if (metadataNode) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d1c5aeb..a7df432 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -50,7 +50,7 @@
import org.apache.asterix.app.active.ActiveEntityEventsListener;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.active.FeedEventsListener;
-import org.apache.asterix.app.external.ExternalLibraryUtils;
+import org.apache.asterix.app.external.ExternalLibraryUtil;
import org.apache.asterix.app.result.ExecutionError;
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
@@ -81,7 +81,6 @@
import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.common.utils.JobUtils;
@@ -110,6 +109,7 @@
import org.apache.asterix.lang.common.statement.CreateFeedStatement;
import org.apache.asterix.lang.common.statement.CreateFunctionStatement;
import org.apache.asterix.lang.common.statement.CreateIndexStatement;
+import org.apache.asterix.lang.common.statement.CreateLibraryStatement;
import org.apache.asterix.lang.common.statement.CreateSynonymStatement;
import org.apache.asterix.lang.common.statement.DatasetDecl;
import org.apache.asterix.lang.common.statement.DataverseDecl;
@@ -125,6 +125,7 @@
import org.apache.asterix.lang.common.statement.IndexDropStatement;
import org.apache.asterix.lang.common.statement.InsertStatement;
import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
+import org.apache.asterix.lang.common.statement.LibraryDropStatement;
import org.apache.asterix.lang.common.statement.LoadStatement;
import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
import org.apache.asterix.lang.common.statement.NodegroupDecl;
@@ -373,6 +374,12 @@
case FUNCTION_DROP:
handleFunctionDropStatement(metadataProvider, stmt);
break;
+ case CREATE_LIBRARY:
+ handleCreateLibraryStatement(metadataProvider, stmt, hcc);
+ break;
+ case LIBRARY_DROP:
+ handleLibraryDropStatement(metadataProvider, stmt, hcc);
+ break;
case CREATE_SYNONYM:
handleCreateSynonymStatement(metadataProvider, stmt);
break;
@@ -1437,8 +1444,6 @@
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
List<JobSpecification> jobsToExecute = new ArrayList<>();
- List<Library> librariesToDelete = new ArrayList<>();
- ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
try {
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
if (dv == null) {
@@ -1512,9 +1517,16 @@
ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(dataset);
}
}
+
+ // #. prepare jobs which will drop corresponding libraries.
+ List<Library> libraries = MetadataManager.INSTANCE.getDataverseLibraries(mdTxnCtx, dataverseName);
+ for (Library library : libraries) {
+ jobsToExecute.add(ExternalLibraryUtil.buildDropLibraryJobSpec(dataverseName, library.getName(),
+ metadataProvider));
+ }
+
jobsToExecute.add(DataverseUtil.dropDataverseJobSpec(dv, metadataProvider));
- librariesToDelete = MetadataManager.INSTANCE.getDataverseLibraries(mdTxnCtx, dataverseName);
// #. mark PendingDropOp on the dataverse record by
// first, deleting the dataverse record from the DATAVERSE_DATASET
// second, inserting the dataverse record with the PendingDropOp value into the
@@ -1531,10 +1543,6 @@
runJob(hcc, jobSpec);
}
- for (Library lib : librariesToDelete) {
- ExternalLibraryUtils.deleteDeployedUdf(broker, appCtx, dataverseName, lib.getName());
- }
-
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1567,15 +1575,11 @@
}
// #. execute compensation operations
- // remove the all indexes in NC
+ // remove the all artifacts in NC
try {
for (JobSpecification jobSpec : jobsToExecute) {
runJob(hcc, jobSpec);
}
-
- for (Library lib : librariesToDelete) {
- ExternalLibraryUtils.deleteDeployedUdf(broker, appCtx, dataverseName, lib.getName());
- }
} catch (Exception e2) {
// do no throw exception since still the metadata needs to be compensated.
e.addSuppressed(e2);
@@ -2213,6 +2217,205 @@
}
}
+ protected void handleCreateLibraryStatement(MetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc) throws Exception {
+ CreateLibraryStatement cls = (CreateLibraryStatement) stmt;
+ DataverseName dataverseName = getActiveDataverseName(cls.getDataverseName());
+ String libraryName = cls.getLibraryName();
+ lockUtil.createLibraryBegin(lockManager, metadataProvider.getLocks(), dataverseName, libraryName);
+ try {
+ doCreateLibrary(metadataProvider, dataverseName, libraryName, cls, hcc);
+ } finally {
+ metadataProvider.getLocks().unlock();
+ }
+ }
+
+ private void doCreateLibrary(MetadataProvider metadataProvider, DataverseName dataverseName, String libraryName,
+ CreateLibraryStatement cls, IHyracksClientConnection hcc) throws Exception {
+ JobUtils.ProgressState progress = ProgressState.NO_PROGRESS;
+ boolean prepareJobSuccessful = false;
+ JobSpecification abortJobSpec = null;
+ Library existingLibrary = null;
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ try {
+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+ if (dv == null) {
+ throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, dataverseName);
+ }
+ ExternalFunctionLanguage language = cls.getLang();
+ existingLibrary = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, dataverseName, libraryName);
+ if (existingLibrary != null && !cls.getReplaceIfExists()) {
+ throw new CompilationException(ErrorCode.COMPILATION_ERROR,
+ "A library with this name " + libraryName + " already exists.");
+ }
+
+ // #. add/update library with PendingAddOp
+ Library libraryPendingAdd =
+ new Library(dataverseName, libraryName, language.name(), MetadataUtil.PENDING_ADD_OP);
+ if (existingLibrary == null) {
+ MetadataManager.INSTANCE.addLibrary(mdTxnCtx, libraryPendingAdd);
+ } else {
+ MetadataManager.INSTANCE.updateLibrary(mdTxnCtx, libraryPendingAdd);
+ }
+
+ // #. prepare to create library artifacts in NC.
+ Triple<JobSpecification, JobSpecification, JobSpecification> jobSpecs =
+ ExternalLibraryUtil.buildCreateLibraryJobSpec(dataverseName, libraryName, language,
+ cls.getLocation(), cls.getAuthToken(), metadataProvider);
+ JobSpecification prepareJobSpec = jobSpecs.first;
+ JobSpecification commitJobSpec = jobSpecs.second;
+ abortJobSpec = jobSpecs.third;
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+ progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
+
+ // #. create library artifacts in NCs.
+ runJob(hcc, prepareJobSpec, jobFlags);
+ prepareJobSuccessful = true;
+ runJob(hcc, commitJobSpec, jobFlags);
+
+ // #. begin new metadataTxn
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+ Library newLibrary = new Library(dataverseName, libraryName, language.name(), MetadataUtil.PENDING_NO_OP);
+ MetadataManager.INSTANCE.updateLibrary(mdTxnCtx, newLibrary);
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ abort(e, e, mdTxnCtx);
+ }
+ if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+ boolean undoFailure = false;
+ if (!prepareJobSuccessful) {
+ // 'prepare' job failed -> try running 'abort' job
+ try {
+ runJob(hcc, abortJobSpec, jobFlags);
+ } catch (Exception e2) {
+ e.addSuppressed(e2);
+ undoFailure = true;
+ }
+ } else if (existingLibrary == null) {
+ // 'commit' job failed for a new library -> try removing the library
+ try {
+ JobSpecification dropLibraryJobSpec = ExternalLibraryUtil.buildDropLibraryJobSpec(dataverseName,
+ libraryName, metadataProvider);
+ runJob(hcc, dropLibraryJobSpec, jobFlags);
+ } catch (Exception e2) {
+ e.addSuppressed(e2);
+ undoFailure = true;
+ }
+ } else {
+ // 'commit' job failed for an existing library -> bad state
+ undoFailure = true;
+ }
+
+ // revert/remove the record from the metadata.
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ try {
+ if (existingLibrary == null) {
+ MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName);
+ } else {
+ MetadataManager.INSTANCE.updateLibrary(mdTxnCtx, existingLibrary);
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e2) {
+ e.addSuppressed(e2);
+ abort(e, e2, mdTxnCtx);
+ throw new IllegalStateException("System is inconsistent state: pending library(" + libraryName
+ + ") couldn't be reverted/removed from the metadata", e);
+ }
+
+ if (undoFailure) {
+ throw new IllegalStateException(
+ "System is inconsistent state: library(" + libraryName + ") couldn't be deployed", e);
+ }
+ }
+ throw e;
+ }
+ }
+
+ protected void handleLibraryDropStatement(MetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc) throws Exception {
+ LibraryDropStatement stmtDropLibrary = (LibraryDropStatement) stmt;
+ DataverseName dataverseName = getActiveDataverseName(stmtDropLibrary.getDataverseName());
+ String libraryName = stmtDropLibrary.getLibraryName();
+ lockUtil.dropLibraryBegin(lockManager, metadataProvider.getLocks(), dataverseName, libraryName);
+ try {
+ doDropLibrary(metadataProvider, dataverseName, libraryName, hcc);
+ } finally {
+ metadataProvider.getLocks().unlock();
+ }
+ }
+
+ private void doDropLibrary(MetadataProvider metadataProvider, DataverseName dataverseName, String libraryName,
+ IHyracksClientConnection hcc) throws Exception {
+ JobUtils.ProgressState progress = ProgressState.NO_PROGRESS;
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ try {
+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+ if (dv == null) {
+ throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, dataverseName);
+ }
+ Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, dataverseName, libraryName);
+ if (library == null) {
+ throw new CompilationException(ErrorCode.UNKNOWN_LIBRARY, libraryName);
+ }
+
+ // #. mark the existing library as PendingDropOp
+ // do drop instead of update because drop will fail if the library is used by functions/adapters
+ MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName);
+ MetadataManager.INSTANCE.addLibrary(mdTxnCtx,
+ new Library(dataverseName, libraryName, library.getLanguage(), MetadataUtil.PENDING_DROP_OP));
+
+ // #. drop library artifacts in NCs.
+ JobSpecification jobSpec =
+ ExternalLibraryUtil.buildDropLibraryJobSpec(dataverseName, libraryName, metadataProvider);
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+ progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
+
+ // #. drop library artifacts in NCs.
+ runJob(hcc, jobSpec, jobFlags);
+
+ // #. begin new metadataTxn
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+ // #. drop library
+ MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName);
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ abort(e, e, mdTxnCtx);
+ }
+ if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
+ // remove the record from the metadata.
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ try {
+ MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e2) {
+ e.addSuppressed(e2);
+ abort(e, e2, mdTxnCtx);
+ throw new IllegalStateException("System is inconsistent state: pending library(" + libraryName
+ + ") couldn't be removed from the metadata", e);
+ }
+ }
+ throw e;
+ }
+ }
+
protected void handleCreateSynonymStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
CreateSynonymStatement css = (CreateSynonymStatement) stmt;
DataverseName dataverseName = getActiveDataverseName(css.getDataverseName());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index ee5b957..3eeed59 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -79,7 +79,6 @@
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
import org.apache.asterix.common.utils.Servlets;
import org.apache.asterix.external.adapter.factory.AdapterFactoryService;
-import org.apache.asterix.external.library.ExternalLibraryManager;
import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.messaging.CCMessageBroker;
import org.apache.asterix.metadata.MetadataManager;
@@ -159,9 +158,7 @@
componentProvider = new StorageComponentProvider();
ccExtensionManager = new CCExtensionManager(new ArrayList<>(getExtensions()));
IGlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager();
- ILibraryManager libraryManager = new ExternalLibraryManager(ccServiceCtx.getServerCtx().getAppDir(),
- ccServiceCtx.getPersistedResourceRegistry());
- appCtx = createApplicationContext(libraryManager, globalRecoveryManager, lifecycleCoordinator,
+ appCtx = createApplicationContext(null, globalRecoveryManager, lifecycleCoordinator,
() -> new Receptionist("CC"), ConfigValidator::new, ccExtensionManager, new AdapterFactoryService());
final CCConfig ccConfig = controllerService.getCCConfig();
if (System.getProperty("java.rmi.server.hostname") == null) {
@@ -212,10 +209,10 @@
IReceptionistFactory receptionistFactory, IConfigValidatorFactory configValidatorFactory,
CCExtensionManager ccExtensionManager, IAdapterFactoryService adapterFactoryService)
throws AlgebricksException, IOException {
- return new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE,
- globalRecoveryManager, lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider,
- new MetadataLockManager(), createMetadataLockUtil(), receptionistFactory, configValidatorFactory,
- ccExtensionManager, adapterFactoryService);
+ return new CcApplicationContext(ccServiceCtx, getHcc(), () -> MetadataManager.INSTANCE, globalRecoveryManager,
+ lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider, new MetadataLockManager(),
+ createMetadataLockUtil(), receptionistFactory, configValidatorFactory, ccExtensionManager,
+ adapterFactoryService);
}
protected IGlobalRecoveryManager createGlobalRecoveryManager() throws Exception {
@@ -300,7 +297,7 @@
}
protected void addServlet(HttpServer server, String path) {
- server.addServlet(createServlet(server.ctx(), path, path));
+ server.addServlet(createServlet(server, path, path));
}
protected HttpServer setupQueryWebServer(ExternalProperties externalProperties) throws Exception {
@@ -315,7 +312,8 @@
return queryWebServer;
}
- protected IServlet createServlet(ConcurrentMap<String, Object> ctx, String key, String... paths) {
+ protected IServlet createServlet(HttpServer server, String key, String... paths) {
+ ConcurrentMap<String, Object> ctx = server.ctx();
switch (key) {
case Servlets.RUNNING_REQUESTS:
return new CcQueryCancellationServlet(ctx, appCtx, paths);
@@ -349,9 +347,11 @@
case Servlets.ACTIVE_STATS:
return new ActiveStatsApiServlet(appCtx, ctx, paths);
case Servlets.UDF:
- return new UdfApiServlet(appCtx, ctx, paths);
+ return new UdfApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP),
+ getStatementExecutorFactory(), componentProvider, server.getScheme(),
+ server.getAddress().getPort());
default:
- throw new IllegalStateException(String.valueOf(key));
+ throw new IllegalStateException(key);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 7093660..8ea0ad4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -267,8 +267,9 @@
(FeedIntakeOperatorDescriptor) intakeJob.getOperatorMap().get(new OperatorDescriptorId(0));
FeedIntakeOperatorDescriptor ingestionOp;
if (firstOp.getAdaptorFactory() == null) {
- ingestionOp = new FeedIntakeOperatorDescriptor(jobSpec, feed, firstOp.getAdaptorLibraryName(),
- firstOp.getAdaptorFactoryClassName(), firstOp.getAdapterOutputType(), firstOp.getPolicyAccessor(),
+ ingestionOp = new FeedIntakeOperatorDescriptor(jobSpec, feed, firstOp.getAdaptorLibraryDataverse(),
+ firstOp.getAdaptorLibraryName(), firstOp.getAdaptorFactoryClassName(),
+ firstOp.getAdapterOutputType(), firstOp.getPolicyAccessor(),
firstOp.getOutputRecordDescriptors()[0]);
} else {
ingestionOp = new FeedIntakeOperatorDescriptor(jobSpec, feed, firstOp.getAdaptorFactory(),
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 117fb63..86d8ed4 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -199,7 +199,6 @@
List<ILibraryManager> libraryManagers = new ArrayList<>();
init(deleteOldInstanceData, confDir);
if (externalLibPath != null && externalLibPath.length() != 0) {
- libraryManagers.add(((ICcApplicationContext) cc.getApplicationContext()).getLibraryManager());
for (NodeControllerService nc : ncs) {
INcApplicationContext runtimeCtx = (INcApplicationContext) nc.getApplicationContext();
libraryManagers.add(runtimeCtx.getLibraryManager());
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index 30ebc1d..2328bf4 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -24,7 +24,6 @@
import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.IdentitiyResolverFactory;
@@ -92,8 +91,6 @@
FailedGroup.setName("failed");
List<ILibraryManager> libraryManagers = new ArrayList<>();
- // Adds the library manager for CC.
- libraryManagers.add(((ICcApplicationContext) integrationUtil.cc.getApplicationContext()).getLibraryManager());
// Adds library managers for NCs, one-per-NC.
for (NodeControllerService nc : integrationUtil.ncs) {
INcApplicationContext runtimeCtx = (INcApplicationContext) nc.getApplicationContext();
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.0.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.0.ddl.sqlpp
new file mode 100644
index 0000000..617df1f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.0.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Feed with an external adapter
+ * Expected Res : Success
+ */
+
+drop dataverse externallibtest if exists;
+create dataverse externallibtest;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.1.lib.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.1.lib.sqlpp
new file mode 100644
index 0000000..253c657
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.1.lib.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Feed with an external adapter
+ * Expected Res : Success
+ */
+
+install externallibtest testlib admin admin target/data/externallib/asterix-external-data-testlib.zip
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.2.ddl.sqlpp
new file mode 100644
index 0000000..9cb4046
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.2.ddl.sqlpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Feed with an external adapter
+ * Expected Res : Success
+ */
+
+use externallibtest;
+
+create adapter TweetAdapter language java as "testlib",
+ "org.apache.asterix.external.library.adapter.TestTypedAdapterFactory";
+
+create type TweetType as open {
+ tweetid: int64
+};
+
+create dataset Tweets(TweetType) primary key tweetid;
+
+create feed TweetFeed with {
+ "adapter-name": "TweetAdapter",
+ "type-name" : "TweetType",
+ "num_output_records": 4
+};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.3.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.3.update.sqlpp
new file mode 100644
index 0000000..c23a37d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.3.update.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Feed with an external adapter
+ * Expected Res : Success
+ */
+
+use externallibtest;
+
+set `wait-for-completion-feed` "true";
+
+connect feed TweetFeed to dataset Tweets;
+start feed TweetFeed;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.4.query.sqlpp
new file mode 100644
index 0000000..07a093a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-adapter/feed-with-external-adapter.4.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Feed with an external adapter
+ * Expected Res : Success
+ */
+
+use externallibtest;
+
+select value t
+from Tweets t
+order by t.tweetid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/deterministic/deterministic.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/deterministic/deterministic.4.adm
index 18dce53..9b1b82f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/deterministic/deterministic.4.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/deterministic/deterministic.4.adm
@@ -2,7 +2,7 @@
-- DISTRIBUTE_RESULT |UNPARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
- assign [$$1] <- [{"default": getCapital_default("United States"), "deterministic": { country: "United States", capital: "Washington D.C." }, "not_deterministic": getCapital_not_deterministic("United States")}]
+ assign [$$1] <- [{"default": getCapital_default("United States"), "deterministic": getCapital_deterministic("United States"), "not_deterministic": getCapital_not_deterministic("United States")}]
-- ASSIGN |UNPARTITIONED|
empty-tuple-source
-- EMPTY_TUPLE_SOURCE |UNPARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-external-adapter/feed-with-external-adapter.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-external-adapter/feed-with-external-adapter.4.adm
new file mode 100644
index 0000000..9c810dd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-external-adapter/feed-with-external-adapter.4.adm
@@ -0,0 +1,4 @@
+{ "tweetid": 1, "message-text": "1" }
+{ "tweetid": 2, "message-text": "2" }
+{ "tweetid": 3, "message-text": "3" }
+{ "tweetid": 4, "message-text": "4" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml
index cd67e42..156d5a0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_sqlpp.xml
@@ -46,7 +46,7 @@
<test-case FilePath="external-library">
<compilation-unit name="mysum_dropinuse">
<output-dir compare="Text">mysum_dropinuse</output-dir>
- <expected-error>Library testlib is being used. It cannot be dropped</expected-error>
+ <expected-error>Cannot drop library externallibtest.testlib being used by funciton externallibtest.mysum@2</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="external-library">
@@ -102,5 +102,10 @@
<output-dir compare="Text">feed-with-external-function</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="feed-with-external-adapter">
+ <output-dir compare="Text">feed-with-external-adapter</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
</test-suite>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
index 0340898..67f8253 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
@@ -28,7 +28,6 @@
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.config.TransactionProperties;
-import org.apache.asterix.common.library.ILibraryManager;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -56,11 +55,6 @@
NodeProperties getNodeProperties();
/**
- * @return the library manager which implements {@link org.apache.asterix.common.library.ILibraryManager}
- */
- ILibraryManager getLibraryManager();
-
- /**
* @return the service context
*/
IServiceContext getServiceContext();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index eed186c..65b587b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -23,6 +23,7 @@
import java.util.concurrent.Executor;
import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
@@ -134,4 +135,9 @@
* @return the cache manager
*/
ICacheManager getCacheManager();
+
+ /**
+ * @return the library manager
+ */
+ ILibraryManager getLibraryManager();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index fdba8d6..9cf6f9d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -324,7 +324,6 @@
public static final int FAILED_TO_PARSE_METADATA = 3115;
public static final int INPUT_DECODE_FAILURE = 3116;
public static final int FAILED_TO_PARSE_MALFORMED_LOG_RECORD = 3117;
- public static final int METADATA_DROP_LIBRARY_IN_USE = 3118;
// Lifecycle management errors
public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
index 6d2df37..65b415d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
@@ -23,7 +23,7 @@
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -76,6 +76,6 @@
* @throws AlgebricksException
* @throws HyracksDataException
*/
- void configure(IServiceContext serviceContext, Map<String, String> configuration,
+ void configure(ICCServiceContext serviceContext, Map<String, String> configuration,
IWarningCollector warningCollector) throws HyracksDataException, AlgebricksException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibrary.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibrary.java
index 164c215..9b60bf1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibrary.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibrary.java
@@ -21,10 +21,11 @@
package org.apache.asterix.common.library;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface ILibrary {
ExternalFunctionLanguage getLanguage();
- void close();
+ void close() throws HyracksDataException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
index fdaaaeb..83f1d71 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
@@ -19,20 +19,21 @@
package org.apache.asterix.common.library;
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
public interface ILibraryManager {
- void setUpDeployedLibrary(String path) throws IOException, AsterixException;
+ ILibrary getLibrary(DataverseName dataverseName, String libraryName) throws HyracksDataException;
- void scanLibraries(File appDir);
+ void closeLibrary(DataverseName dataverseName, String libraryName) throws HyracksDataException;
- void deregister(DataverseName dv, String name);
+ // deployment helpers
- ILibrary getLibrary(DataverseName dvName, String libraryName);
+ FileReference getLibraryDir(DataverseName dataverseName, String libraryName) throws HyracksDataException;
+ void dropLibraryPath(FileReference fileRef) throws HyracksDataException;
+
+ byte[] serializeLibraryDescriptor(LibraryDescriptor libraryDescriptor) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/LibraryDescriptor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/LibraryDescriptor.java
index 9dd2a3d..72bde09 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/LibraryDescriptor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/LibraryDescriptor.java
@@ -24,7 +24,6 @@
import org.apache.hyracks.api.io.IPersistedResourceRegistry;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
@@ -33,29 +32,35 @@
public class LibraryDescriptor implements IJsonSerializable {
private static final long serialVersionUID = 1L;
- public static final String DESCRIPTOR_NAME = "descriptor.json";
- private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+
+ private static final String FIELD_LANGUAGE = "lang";
+
+ public static final String FILE_EXT_ZIP = "zip";
+
+ public static final String FILE_EXT_PYZ = "pyz";
+
/**
* The library's language
*/
private final ExternalFunctionLanguage lang;
- public LibraryDescriptor(ExternalFunctionLanguage lang) {
- this.lang = lang;
+ public LibraryDescriptor(ExternalFunctionLanguage language) {
+ this.lang = language;
}
- public ExternalFunctionLanguage getLang() {
+ public ExternalFunctionLanguage getLanguage() {
return lang;
}
public JsonNode toJson(IPersistedResourceRegistry registry) {
ObjectNode jsonNode = registry.getClassIdentifier(LibraryDescriptor.class, serialVersionUID);
- jsonNode.put("lang", lang.name());
+ jsonNode.put(FIELD_LANGUAGE, lang.name());
return jsonNode;
}
public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
- final ExternalFunctionLanguage lang = ExternalFunctionLanguage.valueOf(json.get("lang").asText());
+ String langText = json.get(FIELD_LANGUAGE).asText();
+ ExternalFunctionLanguage lang = ExternalFunctionLanguage.valueOf(langText);
return new LibraryDescriptor(lang);
}
}
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 9bbebf2..1cf653f 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -322,7 +322,6 @@
3115 = Failed to parse record metadata
3116 = Failed to decode input
3117 = Failed to parse record, malformed log record
-3118 = Library %1$s is being used. It cannot be dropped
# Lifecycle management errors
4000 = Partition id %1$s for node %2$s already in use by node %3$s
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/ExternalAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/ExternalAdapterFactory.java
new file mode 100644
index 0000000..45c4b12
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/ExternalAdapterFactory.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.adapter.factory;
+
+import java.util.Map;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IDataSourceAdapter;
+import org.apache.asterix.common.functions.ExternalFunctionLanguage;
+import org.apache.asterix.common.library.ILibrary;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
+import org.apache.asterix.external.library.JavaLibrary;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public final class ExternalAdapterFactory implements ITypedAdapterFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final DataverseName libraryDataverse;
+
+ private final String libraryName;
+
+ private final String className;
+
+ private ARecordType outputType;
+
+ private ARecordType metaType;
+
+ private Map<String, String> configuration;
+
+ private transient ICCServiceContext serviceContext;
+
+ public ExternalAdapterFactory(DataverseName libraryDataverse, String libraryName, String className) {
+ this.libraryDataverse = libraryDataverse;
+ this.libraryName = libraryName;
+ this.className = className;
+ }
+
+ @Override
+ public void configure(ICCServiceContext serviceContext, Map<String, String> configuration,
+ IWarningCollector warningCollector) {
+ this.serviceContext = serviceContext;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
+ //TODO:needs to be specified in the adapter configuration
+ ICcApplicationContext appCtx = (ICcApplicationContext) serviceContext.getApplicationContext();
+ return IExternalDataSourceFactory.getPartitionConstraints(appCtx, null, 1);
+ }
+
+ @Override
+ public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+ INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
+ INcApplicationContext appCtx = (INcApplicationContext) serviceCtx.getApplicationContext();
+ ILibraryManager libraryManager = appCtx.getLibraryManager();
+ ILibrary library = libraryManager.getLibrary(libraryDataverse, libraryName);
+ if (ExternalFunctionLanguage.JAVA != library.getLanguage()) {
+ throw new HyracksDataException("Unexpected library language: " + library.getLanguage());
+ }
+ ClassLoader cl = ((JavaLibrary) library).getClassLoader();
+ try {
+ ITypedAdapterFactory adapterFactory = (ITypedAdapterFactory) cl.loadClass(className).newInstance();
+ adapterFactory.setOutputType(outputType);
+ adapterFactory.setMetaType(metaType);
+ adapterFactory.configure(null, configuration, ctx.getWarningCollector());
+ return adapterFactory.createAdapter(ctx, partition);
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | AlgebricksException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void setOutputType(ARecordType outputType) {
+ this.outputType = outputType;
+ }
+
+ @Override
+ public ARecordType getOutputType() {
+ return outputType;
+ }
+
+ @Override
+ public void setMetaType(ARecordType metaType) {
+ this.metaType = metaType;
+ }
+
+ @Override
+ public ARecordType getMetaType() {
+ return metaType;
+ }
+
+ @Override
+ public String getAlias() {
+ return "external:" + className;
+ }
+
+ public DataverseName getLibraryDataverse() {
+ return libraryDataverse;
+ }
+
+ public String getLibraryName() {
+ return libraryName;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index badf105..897f020 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -22,7 +22,6 @@
import java.util.List;
import java.util.Map;
-import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -51,6 +50,7 @@
import org.apache.asterix.om.utils.RecordUtil;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -124,7 +124,7 @@
private void restoreExternalObjects(IServiceContext serviceContext, ILibraryManager libraryManager,
IWarningCollector warningCollector) throws HyracksDataException, AlgebricksException {
if (dataSourceFactory == null) {
- dataSourceFactory = createExternalDataSourceFactory(configuration, libraryManager);
+ dataSourceFactory = createExternalDataSourceFactory(configuration);
// create and configure parser factory
if (dataSourceFactory.isIndexible() && (files != null)) {
((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
@@ -133,7 +133,7 @@
}
if (dataParserFactory == null) {
// create and configure parser factory
- dataParserFactory = createDataParserFactory(configuration, libraryManager);
+ dataParserFactory = createDataParserFactory(configuration);
dataParserFactory.setRecordType(recordType);
dataParserFactory.setMetaType(metaType);
dataParserFactory.configure(configuration);
@@ -141,18 +141,18 @@
}
@Override
- public void configure(IServiceContext serviceContext, Map<String, String> configuration,
+ public void configure(ICCServiceContext serviceContext, Map<String, String> configuration,
IWarningCollector warningCollector) throws HyracksDataException, AlgebricksException {
this.configuration = configuration;
- IApplicationContext appCtx = (IApplicationContext) serviceContext.getApplicationContext();
+ ICcApplicationContext appCtx = (ICcApplicationContext) serviceContext.getApplicationContext();
ExternalDataUtils.validateDataSourceParameters(configuration);
- dataSourceFactory = createExternalDataSourceFactory(configuration, appCtx.getLibraryManager());
+ dataSourceFactory = createExternalDataSourceFactory(configuration);
if (dataSourceFactory.isIndexible() && (files != null)) {
((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
}
dataSourceFactory.configure(serviceContext, configuration, warningCollector);
ExternalDataUtils.validateDataParserParameters(configuration);
- dataParserFactory = createDataParserFactory(configuration, appCtx.getLibraryManager());
+ dataParserFactory = createDataParserFactory(configuration);
dataParserFactory.setRecordType(recordType);
dataParserFactory.setMetaType(metaType);
dataParserFactory.configure(configuration);
@@ -161,12 +161,12 @@
nullifyExternalObjects();
}
- private void configureFeedLogManager(IApplicationContext appCtx) throws HyracksDataException, AlgebricksException {
+ private void configureFeedLogManager(ICcApplicationContext appCtx)
+ throws HyracksDataException, AlgebricksException {
this.isFeed = ExternalDataUtils.isFeed(configuration);
if (isFeed) {
- feedLogFileSplits = FeedUtils.splitsForAdapter((ICcApplicationContext) appCtx,
- ExternalDataUtils.getDataverse(configuration), ExternalDataUtils.getFeedName(configuration),
- dataSourceFactory.getPartitionConstraint());
+ feedLogFileSplits = FeedUtils.splitsForAdapter(appCtx, ExternalDataUtils.getDataverse(configuration),
+ ExternalDataUtils.getFeedName(configuration), dataSourceFactory.getPartitionConstraint());
}
}
@@ -224,13 +224,12 @@
configuration = Collections.emptyMap();
}
- protected IExternalDataSourceFactory createExternalDataSourceFactory(Map<String, String> configuration,
- ILibraryManager libraryManager) throws HyracksDataException, AsterixException {
- return DatasourceFactoryProvider.getExternalDataSourceFactory(libraryManager, configuration);
+ protected IExternalDataSourceFactory createExternalDataSourceFactory(Map<String, String> configuration)
+ throws HyracksDataException, AsterixException {
+ return DatasourceFactoryProvider.getExternalDataSourceFactory(configuration);
}
- protected IDataParserFactory createDataParserFactory(Map<String, String> configuration,
- ILibraryManager libraryManager) throws AsterixException {
- return ParserFactoryProvider.getDataParserFactory(libraryManager, configuration);
+ protected IDataParserFactory createDataParserFactory(Map<String, String> configuration) throws AsterixException {
+ return ParserFactoryProvider.getDataParserFactory(configuration);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
index b543804..fc0c4f9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
@@ -21,7 +21,6 @@
import java.io.Serializable;
import java.util.Map;
-import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.external.api.ILookupReaderFactory;
import org.apache.asterix.external.api.ILookupRecordReader;
import org.apache.asterix.external.api.IRecordDataParser;
@@ -34,7 +33,7 @@
import org.apache.asterix.external.provider.ParserFactoryProvider;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
@@ -79,17 +78,14 @@
}
}
- public void configure(IServiceContext serviceContext, Map<String, String> configuration,
+ public void configure(ICCServiceContext serviceContext, Map<String, String> configuration,
IWarningCollector warningCollector) throws HyracksDataException, AlgebricksException {
this.configuration = configuration;
- IApplicationContext appCtx = (IApplicationContext) serviceContext.getApplicationContext();
readerFactory =
LookupReaderFactoryProvider.getLookupReaderFactory(serviceContext, configuration, warningCollector);
- dataParserFactory = (IRecordDataParserFactory<T>) ParserFactoryProvider
- .getDataParserFactory(appCtx.getLibraryManager(), configuration);
+ dataParserFactory = (IRecordDataParserFactory<T>) ParserFactoryProvider.getDataParserFactory(configuration);
dataParserFactory.setRecordType(recordType);
readerFactory.configure(serviceContext, configuration, warningCollector);
dataParserFactory.configure(configuration);
}
-
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index 2533af5..b58e604 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -53,6 +53,8 @@
@Override
public void close() throws IOException {
- feedLogManager.close();
+ if (feedLogManager != null) {
+ feedLogManager.close();
+ }
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
index b09752e..b0a4dfdc 100755
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
@@ -20,114 +20,292 @@
import static com.fasterxml.jackson.databind.MapperFeature.SORT_PROPERTIES_ALPHABETICALLY;
import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS;
-import static org.apache.asterix.common.library.LibraryDescriptor.DESCRIPTOR_NAME;
-import java.io.File;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.file.Files;
-import java.util.Collections;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
import java.util.HashMap;
import java.util.Map;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.library.ILibrary;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.library.LibraryDescriptor;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.commons.io.FileUtils;
import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IPersistedResourceRegistry;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.util.file.FileUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
-public class ExternalLibraryManager implements ILibraryManager {
+public final class ExternalLibraryManager implements ILibraryManager, ILifeCycleComponent {
- protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ public static final String LIBRARY_MANAGER_BASE_DIR_NAME = "library";
- static {
- OBJECT_MAPPER.enable(SerializationFeature.INDENT_OUTPUT);
- OBJECT_MAPPER.configure(SORT_PROPERTIES_ALPHABETICALLY, true);
- OBJECT_MAPPER.configure(ORDER_MAP_ENTRIES_BY_KEYS, true);
+ private static final String STORAGE_DIR_NAME = "storage";
+
+ private static final String TRASH_DIR_NAME = "trash";
+
+ public static final String REV_0_DIR_NAME = "rev_0";
+
+ public static final String REV_1_DIR_NAME = "rev_1";
+
+ public static final String STAGE_DIR_NAME = "stage";
+
+ public static final String CONTENTS_DIR_NAME = "contents";
+
+ public static final String DESCRIPTOR_FILE_NAME = "lib.json";
+
+ private static final Logger LOGGER = LogManager.getLogger(ExternalLibraryManager.class);
+
+ private final NodeControllerService ncs;
+ private final IPersistedResourceRegistry reg;
+ private final ObjectMapper objectMapper;
+ private final FileReference baseDir;
+ private final FileReference storageDir;
+ private final Path storageDirPath;
+ private final FileReference trashDir;
+ private final Path trashDirPath;
+ private final Map<Pair<DataverseName, String>, ILibrary> libraries = new HashMap<>();
+
+ public ExternalLibraryManager(NodeControllerService ncs, IPersistedResourceRegistry reg, FileReference appDir) {
+ this.ncs = ncs;
+ this.reg = reg;
+ baseDir = appDir.getChild(LIBRARY_MANAGER_BASE_DIR_NAME);
+ storageDir = baseDir.getChild(STORAGE_DIR_NAME);
+ storageDirPath = storageDir.getFile().toPath();
+ trashDir = baseDir.getChild(TRASH_DIR_NAME);
+ trashDirPath = trashDir.getFile().toPath().normalize();
+ objectMapper = createObjectMapper();
}
- private static final Logger LOGGER = LogManager.getLogger();
- private final Map<Pair<DataverseName, String>, ILibrary> libraries = Collections.synchronizedMap(new HashMap<>());
- private final IPersistedResourceRegistry reg;
-
- public ExternalLibraryManager(File appDir, IPersistedResourceRegistry reg) {
- this.reg = reg;
- scanLibraries(appDir);
+ public void initStorage(boolean resetStorageData) throws HyracksDataException {
+ try {
+ Path baseDirPath = baseDir.getFile().toPath();
+ if (Files.isDirectory(baseDirPath)) {
+ if (resetStorageData) {
+ FileUtils.cleanDirectory(baseDir.getFile());
+ Files.createDirectory(storageDirPath);
+ Files.createDirectory(trashDirPath);
+ IoUtil.flushDirectory(baseDirPath);
+ } else {
+ boolean createdDirs = false;
+ if (!Files.isDirectory(storageDirPath)) {
+ Files.deleteIfExists(storageDirPath);
+ Files.createDirectory(storageDirPath);
+ createdDirs = true;
+ }
+ if (Files.isDirectory(trashDirPath)) {
+ FileUtils.cleanDirectory(trashDir.getFile());
+ } else {
+ Files.deleteIfExists(trashDirPath);
+ Files.createDirectory(trashDirPath);
+ createdDirs = true;
+ }
+ //TODO:clean all rev_0 if their rev_1 exist
+ if (createdDirs) {
+ IoUtil.flushDirectory(baseDirPath);
+ }
+ }
+ } else {
+ FileUtil.forceMkdirs(baseDir.getFile());
+ Files.createDirectory(storageDirPath);
+ Files.createDirectory(trashDirPath);
+ // flush app dir's parent because we might've created app dir there
+ IoUtil.flushDirectory(baseDirPath.getParent().getParent());
+ // flush app dir (base dir's parent) because we might've created base dir there
+ IoUtil.flushDirectory(baseDirPath.getParent());
+ // flush base dir because we created storage/trash dirs there
+ IoUtil.flushDirectory(baseDirPath);
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
}
@Override
- public void scanLibraries(File appDir) {
- File[] libs = appDir.listFiles((dir, name) -> dir.isDirectory());
- if (libs != null) {
- for (File lib : libs) {
- String libraryPath = lib.getAbsolutePath();
+ public void start() {
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream ouputStream) {
+ synchronized (this) {
+ for (Map.Entry<Pair<DataverseName, String>, ILibrary> p : libraries.entrySet()) {
+ ILibrary library = p.getValue();
try {
- setUpDeployedLibrary(libraryPath);
- } catch (AsterixException | IOException e) {
- LOGGER.error("Unable to properly initialize external libraries", e);
+ library.close();
+ } catch (HyracksDataException e) {
+ LOGGER.warn("Error closing library " + p.getKey().first + "." + p.getKey().second, e);
}
}
}
}
- public void register(DataverseName dataverseName, String libraryName, ILibrary library) {
- Pair<DataverseName, String> key = getKey(dataverseName, libraryName);
- libraries.put(key, library);
+ private FileReference getDataverseDir(DataverseName dataverseName) throws HyracksDataException {
+ return getChildFileRef(storageDir, dataverseName.getCanonicalForm());
}
@Override
- public void deregister(DataverseName dataverseName, String libraryName) {
+ public FileReference getLibraryDir(DataverseName dataverseName, String libraryName) throws HyracksDataException {
+ FileReference dataverseDir = getDataverseDir(dataverseName);
+ return getChildFileRef(dataverseDir, libraryName);
+ }
+
+ @Override
+ public ILibrary getLibrary(DataverseName dataverseName, String libraryName) throws HyracksDataException {
Pair<DataverseName, String> key = getKey(dataverseName, libraryName);
- ILibrary cl = libraries.remove(key);
- if (cl != null) {
- cl.close();
+ synchronized (this) {
+ ILibrary library = libraries.get(key);
+ if (library == null) {
+ library = loadLibrary(dataverseName, libraryName);
+ libraries.put(key, library);
+ }
+ return library;
}
}
- @Override
- public void setUpDeployedLibrary(String libraryPath) throws IOException, AsterixException {
- // get the installed library dirs
- String[] parts = libraryPath.split(File.separator);
- DataverseName catenatedDv = DataverseName.createFromCanonicalForm(parts[parts.length - 1]);
- String name = catenatedDv.getParts().get(catenatedDv.getParts().size() - 1);
- DataverseName dataverse = DataverseName.create(catenatedDv.getParts(), 0, catenatedDv.getParts().size() - 1);
+ private ILibrary loadLibrary(DataverseName dataverseName, String libraryName) throws HyracksDataException {
+ FileReference libRevDir = findLibraryRevDir(dataverseName, libraryName);
+ if (libRevDir == null) {
+ throw new HyracksDataException("Cannot find library: " + dataverseName + '.' + libraryName);
+ }
+ FileReference libContentsDir = libRevDir.getChild(CONTENTS_DIR_NAME);
+ if (!libContentsDir.getFile().isDirectory()) {
+ throw new HyracksDataException("Cannot find library: " + dataverseName + '.' + libraryName);
+ }
try {
- File langFile = new File(libraryPath, DESCRIPTOR_NAME);
- final JsonNode jsonNode = OBJECT_MAPPER.readValue(Files.readAllBytes(langFile.toPath()), JsonNode.class);
- LibraryDescriptor desc = (LibraryDescriptor) reg.deserialize(jsonNode);
- ExternalFunctionLanguage libLang = desc.getLang();
+ FileReference descFile = libRevDir.getChild(DESCRIPTOR_FILE_NAME);
+ byte[] descData = Files.readAllBytes(descFile.getFile().toPath());
+ LibraryDescriptor desc = deserializeLibraryDescriptor(descData);
+ ExternalFunctionLanguage libLang = desc.getLanguage();
switch (libLang) {
case JAVA:
- register(dataverse, name, new JavaLibrary(libraryPath));
- break;
+ return new JavaLibrary(libContentsDir.getFile());
case PYTHON:
- register(dataverse, name, new PythonLibrary(libraryPath));
- break;
+ return new PythonLibrary(libContentsDir.getFile());
default:
- throw new IllegalStateException("Library path file refers to unknown language");
+ throw new HyracksDataException("Invalid language: " + libraryName);
}
- } catch (IOException | AsterixException e) {
- LOGGER.error("Failed to initialized library", e);
- throw e;
+ } catch (IOException e) {
+ LOGGER.error("Failed to initialize library " + dataverseName + '.' + libraryName, e);
+ throw HyracksDataException.create(e);
}
}
@Override
- public ILibrary getLibrary(DataverseName dataverseName, String libraryName) {
+ public byte[] serializeLibraryDescriptor(LibraryDescriptor libraryDescriptor) throws HyracksDataException {
+ try {
+ return objectMapper.writeValueAsBytes(libraryDescriptor.toJson(reg));
+ } catch (JsonProcessingException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private LibraryDescriptor deserializeLibraryDescriptor(byte[] data) throws IOException {
+ JsonNode jsonNode = objectMapper.readValue(data, JsonNode.class);
+ return (LibraryDescriptor) reg.deserialize(jsonNode);
+ }
+
+ private FileReference findLibraryRevDir(DataverseName dataverseName, String libraryName)
+ throws HyracksDataException {
+ FileReference libraryBaseDir = getLibraryDir(dataverseName, libraryName);
+ if (!libraryBaseDir.getFile().isDirectory()) {
+ return null;
+ }
+ FileReference libDirRev1 = libraryBaseDir.getChild(REV_1_DIR_NAME);
+ if (libDirRev1.getFile().isDirectory()) {
+ return libDirRev1;
+ }
+ FileReference libDirRev0 = libraryBaseDir.getChild(REV_0_DIR_NAME);
+ if (libDirRev0.getFile().isDirectory()) {
+ return libDirRev0;
+ }
+ return null;
+ }
+
+ @Override
+ public void closeLibrary(DataverseName dataverseName, String libraryName) throws HyracksDataException {
Pair<DataverseName, String> key = getKey(dataverseName, libraryName);
- return libraries.get(key);
+ ILibrary library;
+ synchronized (this) {
+ library = libraries.remove(key);
+ }
+ if (library != null) {
+ library.close();
+ }
+ }
+
+ @Override
+ public void dumpState(OutputStream os) {
}
private static Pair<DataverseName, String> getKey(DataverseName dataverseName, String libraryName) {
return new Pair<>(dataverseName, libraryName);
}
+ @Override
+ public void dropLibraryPath(FileReference fileRef) throws HyracksDataException {
+ // does not flush any directories
+ try {
+ Path path = fileRef.getFile().toPath();
+ Path trashPath = Files.createTempDirectory(trashDirPath, null);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Drop (move) {} into {}", path, trashPath);
+ }
+ Files.move(path, trashPath, StandardCopyOption.ATOMIC_MOVE);
+ ncs.getWorkQueue().schedule(new DeleteDirectoryWork(trashPath));
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private FileReference getChildFileRef(FileReference dir, String fileName) throws HyracksDataException {
+ Path dirPath = dir.getFile().toPath().toAbsolutePath().normalize();
+ FileReference fileRef = dir.getChild(fileName);
+ Path filePath = fileRef.getFile().toPath().toAbsolutePath().normalize();
+ if (!filePath.startsWith(dirPath)) {
+ throw new HyracksDataException("Invalid file name: " + fileName);
+ }
+ return fileRef;
+ }
+
+ private static ObjectMapper createObjectMapper() {
+ ObjectMapper om = new ObjectMapper();
+ om.enable(SerializationFeature.INDENT_OUTPUT);
+ om.configure(SORT_PROPERTIES_ALPHABETICALLY, true);
+ om.configure(ORDER_MAP_ENTRIES_BY_KEYS, true);
+ return om;
+ }
+
+ private static final class DeleteDirectoryWork extends AbstractWork {
+
+ private final Path path;
+
+ private DeleteDirectoryWork(Path path) {
+ this.path = path;
+ }
+
+ @Override
+ public void run() {
+ try {
+ IoUtil.delete(path);
+ } catch (HyracksDataException e) {
+ LOGGER.warn("Error deleting " + path);
+ }
+ }
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluator.java
index ce23cac..3ff706d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluator.java
@@ -19,7 +19,7 @@
package org.apache.asterix.external.library;
-import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.om.functions.IExternalFunctionInfo;
import org.apache.asterix.om.types.IAType;
@@ -44,6 +44,6 @@
argEvals[i] = args[i].createScalarEvaluator(context);
}
libraryManager =
- ((IApplicationContext) context.getServiceContext().getApplicationContext()).getLibraryManager();
+ ((INcApplicationContext) context.getServiceContext().getApplicationContext()).getLibraryManager();
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaLibrary.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaLibrary.java
index c7e7c09..0ef651d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaLibrary.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaLibrary.java
@@ -26,9 +26,9 @@
import java.net.MalformedURLException;
import java.net.URL;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.library.ILibrary;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -38,22 +38,17 @@
private static final Logger LOGGER = LogManager.getLogger();
- public JavaLibrary(String libraryPath) throws AsterixException, MalformedURLException {
-
- File installDir = new File(libraryPath);
-
- // get a reference to the specific library dir
- File libDir = installDir;
+ public JavaLibrary(File libDir) throws HyracksDataException, MalformedURLException {
FilenameFilter jarFileFilter = (dir, name) -> name.endsWith(".jar");
// Get the jar file <Allow only a single jar file>
String[] jarsInLibDir = libDir.list(jarFileFilter);
if (jarsInLibDir.length > 1) {
- throw new AsterixException("Incorrect library structure: found multiple library jars");
+ throw new HyracksDataException("Incorrect library structure: found multiple library jars");
}
if (jarsInLibDir.length <= 0) {
- throw new AsterixException("Incorrect library structure: could not find library jar");
+ throw new HyracksDataException("Incorrect library structure: could not find library jar");
}
File libJar = new File(libDir, jarsInLibDir[0]);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibrary.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibrary.java
index 3ce3e91..8d388fd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibrary.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibrary.java
@@ -29,8 +29,8 @@
private final File path;
- PythonLibrary(String path) {
- this.path = new File(path);
+ PythonLibrary(File libDir) {
+ this.path = libDir;
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java
new file mode 100644
index 0000000..5927145
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractLibraryOperatorDescriptor.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.operators;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+abstract class AbstractLibraryOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ protected final DataverseName dataverseName;
+
+ protected final String libraryName;
+
+ public AbstractLibraryOperatorDescriptor(IOperatorDescriptorRegistry spec, DataverseName dataverseName,
+ String libraryName) {
+ super(spec, 0, 0);
+ this.dataverseName = dataverseName;
+ this.libraryName = libraryName;
+ }
+
+ protected abstract class AbstractLibraryNodePushable extends AbstractOperatorNodePushable {
+
+ protected final IHyracksTaskContext ctx;
+
+ protected IIOManager ioManager;
+
+ protected ILibraryManager libraryManager;
+
+ private FileReference libraryDir;
+
+ protected AbstractLibraryNodePushable(IHyracksTaskContext ctx) {
+ this.ctx = ctx;
+ }
+
+ protected abstract void execute() throws IOException;
+
+ @Override
+ public final void initialize() throws HyracksDataException {
+ INcApplicationContext runtimeCtx =
+ (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
+ ioManager = runtimeCtx.getIoManager();
+ libraryManager = runtimeCtx.getLibraryManager();
+ libraryDir = libraryManager.getLibraryDir(dataverseName, libraryName);
+ try {
+ execute();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ protected FileReference getLibraryDir() {
+ return libraryDir;
+ }
+
+ protected FileReference getRev0Dir() {
+ return libraryDir.getChild(ExternalLibraryManager.REV_0_DIR_NAME);
+ }
+
+ protected FileReference getRev1Dir() {
+ return libraryDir.getChild(ExternalLibraryManager.REV_1_DIR_NAME);
+ }
+
+ protected FileReference getStageDir() {
+ return libraryDir.getChild(ExternalLibraryManager.STAGE_DIR_NAME);
+ }
+
+ // does not flush any directories
+ protected void dropIfExists(FileReference fileRef) throws HyracksDataException {
+ if (fileRef.getFile().exists()) {
+ libraryManager.dropLibraryPath(fileRef);
+ }
+ }
+
+ protected void move(FileReference src, FileReference dest) throws IOException {
+ dropIfExists(dest);
+ Files.move(src.getFile().toPath(), dest.getFile().toPath(), StandardCopyOption.ATOMIC_MOVE);
+ }
+
+ protected void mkdir(FileReference dir) throws IOException {
+ Files.createDirectory(dir.getFile().toPath());
+ }
+
+ protected void flushDirectory(FileReference dir) throws IOException {
+ flushDirectory(dir.getFile());
+ }
+
+ protected void flushDirectory(File dir) throws IOException {
+ IoUtil.flushDirectory(dir);
+ }
+
+ protected void flushDirectory(Path dir) throws IOException {
+ IoUtil.flushDirectory(dir);
+ }
+
+ protected void closeLibrary() throws HyracksDataException {
+ libraryManager.closeLibrary(dataverseName, libraryName);
+ }
+
+ @Override
+ public final void deinitialize() {
+ }
+
+ @Override
+ public final int getInputArity() {
+ return 0;
+ }
+
+ @Override
+ public final void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ return null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index 569e5a1..27ae55c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.library.ILibrary;
import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.feed.api.IFeed;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
@@ -50,7 +51,7 @@
private static final String FEED_EXTENSION_NAME = "Feed";
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private static final Logger LOGGER = LogManager.getLogger();
@@ -62,6 +63,7 @@
/** The adaptor factory that is used to create an instance of the feed adaptor **/
private ITypedAdapterFactory adaptorFactory;
/** The library that contains the adapter in use. **/
+ private DataverseName adaptorLibraryDataverse;
private String adaptorLibraryName;
/**
* The adapter factory class that is used to create an instance of the feed adapter.
@@ -81,12 +83,13 @@
this.outRecDescs[0] = rDesc;
}
- public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed feed, String adapterLibraryName,
- String adapterFactoryClassName, ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor,
- RecordDescriptor rDesc) {
+ public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed feed, DataverseName adapterLibraryDataverse,
+ String adapterLibraryName, String adapterFactoryClassName, ARecordType adapterOutputType,
+ FeedPolicyAccessor policyAccessor, RecordDescriptor rDesc) {
super(spec, 0, 1);
this.feedId = new EntityId(FEED_EXTENSION_NAME, feed.getDataverseName(), feed.getFeedName());
this.adaptorFactoryClassName = adapterFactoryClassName;
+ this.adaptorLibraryDataverse = adapterLibraryDataverse;
this.adaptorLibraryName = adapterLibraryName;
this.adaptorConfiguration = feed.getConfiguration();
this.adapterOutputType = adapterOutputType;
@@ -108,8 +111,7 @@
INcApplicationContext runtimeCtx =
(INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
ILibraryManager libraryManager = runtimeCtx.getLibraryManager();
-
- ILibrary lib = libraryManager.getLibrary(feedId.getDataverseName(), adaptorLibraryName);
+ ILibrary lib = libraryManager.getLibrary(adaptorLibraryDataverse, adaptorLibraryName);
if (lib.getLanguage() != ExternalFunctionLanguage.JAVA) {
throw new HyracksDataException("Unexpected library language: " + lib.getLanguage());
}
@@ -118,8 +120,7 @@
try {
adapterFactory = (ITypedAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance());
adapterFactory.setOutputType(adapterOutputType);
- adapterFactory.configure(ctx.getJobletContext().getServiceContext(), adaptorConfiguration,
- ctx.getWarningCollector());
+ adapterFactory.configure(null, adaptorConfiguration, ctx.getWarningCollector());
} catch (Exception e) {
throw HyracksDataException.create(e);
}
@@ -153,6 +154,10 @@
return this.policyAccessor;
}
+ public DataverseName getAdaptorLibraryDataverse() {
+ return adaptorLibraryDataverse;
+ }
+
public String getAdaptorLibraryName() {
return this.adaptorLibraryName;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployAbortOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployAbortOperatorDescriptor.java
new file mode 100644
index 0000000..6908361
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployAbortOperatorDescriptor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.operators;
+
+import java.io.IOException;
+
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class LibraryDeployAbortOperatorDescriptor extends AbstractLibraryOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER = LogManager.getLogger(LibraryDeployAbortOperatorDescriptor.class);
+
+ public LibraryDeployAbortOperatorDescriptor(IOperatorDescriptorRegistry spec, DataverseName dataverseName,
+ String libraryName) {
+ super(spec, dataverseName, libraryName);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new AbstractLibraryNodePushable(ctx) {
+ @Override
+ protected void execute() throws IOException {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Abort deployment of library {}.{}", dataverseName, libraryName);
+ }
+
+ FileReference libDir = getLibraryDir();
+
+ FileReference rev0 = getRev0Dir();
+ FileReference rev1 = getRev1Dir();
+ if (rev0.getFile().exists() || rev1.getFile().exists()) {
+ // #. drop 'stage' dir.
+ FileReference stage = getStageDir();
+ dropIfExists(stage);
+ flushDirectory(libDir);
+ } else {
+ // #. drop the whole library dir
+ dropIfExists(libDir);
+ flushDirectory(libDir.getFile().getParentFile());
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployCommitOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployCommitOperatorDescriptor.java
new file mode 100644
index 0000000..30635a7
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployCommitOperatorDescriptor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.operators;
+
+import java.io.IOException;
+
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class LibraryDeployCommitOperatorDescriptor extends AbstractLibraryOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER = LogManager.getLogger(LibraryDeployCommitOperatorDescriptor.class);
+
+ public LibraryDeployCommitOperatorDescriptor(IOperatorDescriptorRegistry spec, DataverseName dataverseName,
+ String libraryName) {
+ super(spec, dataverseName, libraryName);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new AbstractLibraryNodePushable(ctx) {
+ @Override
+ protected void execute() throws IOException {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Commit deployment of library {}.{}", dataverseName, libraryName);
+ }
+
+ // #. rename 'stage' dir into 'rev_1' dir
+ FileReference rev1 = getRev1Dir();
+ FileReference stage = getStageDir();
+ move(stage, rev1);
+
+ // #. flush library dir
+ FileReference libDir = getLibraryDir();
+ flushDirectory(libDir);
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
new file mode 100644
index 0000000..a576c34
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.operators;
+
+import static org.apache.asterix.external.library.ExternalLibraryManager.DESCRIPTOR_FILE_NAME;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+import org.apache.asterix.common.functions.ExternalFunctionLanguage;
+import org.apache.asterix.common.library.LibraryDescriptor;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.util.file.FileUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class LibraryDeployPrepareOperatorDescriptor extends AbstractLibraryOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final int DOWNLOAD_RETRY_COUNT = 10;
+
+ private static final Logger LOGGER = LogManager.getLogger(LibraryDeployPrepareOperatorDescriptor.class);
+
+ private final ExternalFunctionLanguage language;
+ private final URI libLocation;
+ private final String authToken;
+
+ public LibraryDeployPrepareOperatorDescriptor(IOperatorDescriptorRegistry spec, DataverseName dataverseName,
+ String libraryName, ExternalFunctionLanguage language, URI libLocation, String authToken) {
+ super(spec, dataverseName, libraryName);
+ this.language = language;
+ this.libLocation = libLocation;
+ this.authToken = authToken;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new AbstractLibraryNodePushable(ctx) {
+
+ private byte[] copyBuffer;
+
+ @Override
+ protected void execute() throws IOException {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Prepare deployment of library {}.{}", dataverseName, libraryName);
+ }
+
+ // #. create library dir if necessary, clean 'stage' dir
+ FileReference libDir = getLibraryDir();
+ Path libDirPath = libDir.getFile().toPath();
+
+ FileReference stage = getStageDir();
+ if (Files.isDirectory(libDirPath)) {
+ dropIfExists(stage);
+ } else {
+ dropIfExists(libDir);
+ FileUtil.forceMkdirs(libDir.getFile());
+ Path dataverseDir = libDirPath.getParent();
+ flushDirectory(dataverseDir); // might've created this dir
+ flushDirectory(dataverseDir.getParent()); // might've created this dir
+ }
+ mkdir(stage);
+
+ // #. download new content into 'stage' dir
+ fetch(stage);
+
+ // #. close the library (close its open files if any)
+ closeLibrary();
+
+ // #. if 'rev_1' dir exists then rename 'rev_1' dir to 'rev_0' dir.
+ FileReference rev1 = getRev1Dir();
+ if (rev1.getFile().exists()) {
+ FileReference rev0 = getRev0Dir();
+ move(rev1, rev0);
+ }
+
+ // #. flush library dir
+ flushDirectory(libDir);
+ }
+
+ private void fetch(FileReference stageDir) throws IOException {
+
+ String libLocationPath = libLocation.getPath();
+ String fileExt = FilenameUtils.getExtension(libLocationPath);
+
+ FileReference targetFile = stageDir.getChild("lib." + fileExt);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Downloading library from {} into {}", libLocation, targetFile);
+ }
+ download(targetFile);
+
+ // extract from the archive
+ FileReference contentsDir = stageDir.getChild(ExternalLibraryManager.CONTENTS_DIR_NAME);
+ mkdir(contentsDir);
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Extracting library from {} into {}", targetFile, contentsDir);
+ }
+
+ switch (language) {
+ case JAVA:
+ if (!LibraryDescriptor.FILE_EXT_ZIP.equals(fileExt)) {
+ // shouldn't happen
+ throw new IOException("Unexpected file type: " + fileExt);
+ }
+ unzip(targetFile, contentsDir);
+ break;
+ case PYTHON:
+ if (!LibraryDescriptor.FILE_EXT_PYZ.equals(fileExt)) {
+ // shouldn't happen
+ throw new IOException("Unexpected file type: " + fileExt);
+ }
+ shiv(targetFile, stageDir, contentsDir);
+ break;
+ default:
+ // shouldn't happen
+ throw new IOException("Unexpected language: " + language);
+ }
+
+ // write library descriptor
+ FileReference targetDescFile = stageDir.getChild(DESCRIPTOR_FILE_NAME);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Writing library descriptor into {}", targetDescFile);
+ }
+ writeDescriptor(targetDescFile, new LibraryDescriptor(language));
+
+ flushDirectory(contentsDir);
+ flushDirectory(stageDir);
+ }
+
+ private void download(FileReference targetFile) throws HyracksException {
+ try {
+ targetFile.getFile().createNewFile();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ IFileHandle fHandle = ioManager.open(targetFile, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ try {
+ CloseableHttpClient httpClient = HttpClientBuilder.create().build();
+ try {
+ // retry 10 times at maximum for downloading binaries
+ HttpGet request = new HttpGet(libLocation);
+ request.setHeader(HttpHeaders.AUTHORIZATION, authToken);
+ int tried = 0;
+ Exception trace = null;
+ while (tried < DOWNLOAD_RETRY_COUNT) {
+ tried++;
+ CloseableHttpResponse response = null;
+ try {
+ response = httpClient.execute(request);
+ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+ throw new IOException("Http Error: " + response.getStatusLine().getStatusCode());
+ }
+ HttpEntity e = response.getEntity();
+ if (e == null) {
+ throw new IOException("No response");
+ }
+ WritableByteChannel outChannel = ioManager.newWritableChannel(fHandle);
+ OutputStream outStream = Channels.newOutputStream(outChannel);
+ e.writeTo(outStream);
+ outStream.flush();
+ ioManager.sync(fHandle, true);
+ return;
+ } catch (IOException e) {
+ LOGGER.error("Unable to download library", e);
+ trace = e;
+ try {
+ ioManager.truncate(fHandle, 0);
+ } catch (IOException e2) {
+ throw HyracksDataException.create(e2);
+ }
+ } finally {
+ if (response != null) {
+ try {
+ response.close();
+ } catch (IOException e) {
+ LOGGER.warn("Failed to close", e);
+ }
+ }
+ }
+ }
+
+ throw HyracksDataException.create(trace);
+ } finally {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ LOGGER.warn("Failed to close", e);
+ }
+ }
+ } finally {
+ try {
+ ioManager.close(fHandle);
+ } catch (HyracksDataException e) {
+ LOGGER.warn("Failed to close", e);
+ }
+ }
+ }
+
+ private void unzip(FileReference sourceFile, FileReference outputDir) throws IOException {
+ boolean logTraceEnabled = LOGGER.isTraceEnabled();
+ Set<Path> newDirs = new HashSet<>();
+ Path outputDirPath = outputDir.getFile().toPath().toAbsolutePath().normalize();
+ try (ZipFile zipFile = new ZipFile(sourceFile.getFile())) {
+ Enumeration<? extends ZipEntry> entries = zipFile.entries();
+ while (entries.hasMoreElements()) {
+ ZipEntry entry = entries.nextElement();
+ if (entry.isDirectory()) {
+ continue;
+ }
+ Path entryOutputPath = outputDirPath.resolve(entry.getName()).toAbsolutePath().normalize();
+ if (!entryOutputPath.startsWith(outputDirPath)) {
+ throw new IOException("Malformed ZIP archive: " + entry.getName());
+ }
+ Path entryOutputDir = entryOutputPath.getParent();
+ Files.createDirectories(entryOutputDir);
+ // remember new directories so we can flush them later
+ for (Path p = entryOutputDir; !p.equals(outputDirPath); p = p.getParent()) {
+ newDirs.add(p);
+ }
+ try (InputStream in = zipFile.getInputStream(entry)) {
+ FileReference entryOutputFileRef =
+ ioManager.resolveAbsolutePath(entryOutputPath.toString());
+ if (logTraceEnabled) {
+ LOGGER.trace("Extracting file {}", entryOutputFileRef);
+ }
+ writeAndForce(entryOutputFileRef, in);
+ }
+ }
+ }
+ for (Path newDir : newDirs) {
+ flushDirectory(newDir);
+ }
+ }
+
+ private void shiv(FileReference sourceFile, FileReference stageDir, FileReference contentsDir)
+ throws IOException {
+ FileReference pyro4 = stageDir.getChild("pyro4.pyz");
+ writeShim(pyro4);
+ unzip(sourceFile, contentsDir);
+ unzip(pyro4, contentsDir);
+ writeShim(contentsDir.getChild("entrypoint.py"));
+ Files.delete(pyro4.getFile().toPath());
+ }
+
+ private void writeShim(FileReference outputFile) throws IOException {
+ InputStream is = getClass().getClassLoader().getResourceAsStream(outputFile.getFile().getName());
+ if (is == null) {
+ throw new IOException("Classpath does not contain necessary Python resources!");
+ }
+ try {
+ writeAndForce(outputFile, is);
+ } finally {
+ is.close();
+ }
+ }
+
+ private void writeDescriptor(FileReference descFile, LibraryDescriptor desc) throws IOException {
+ byte[] bytes = libraryManager.serializeLibraryDescriptor(desc);
+ writeAndForce(descFile, new ByteArrayInputStream(bytes));
+ }
+
+ private void writeAndForce(FileReference outputFile, InputStream dataStream) throws IOException {
+ outputFile.getFile().createNewFile();
+ IFileHandle fHandle = ioManager.open(outputFile, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ try {
+ WritableByteChannel outChannel = ioManager.newWritableChannel(fHandle);
+ OutputStream outputStream = Channels.newOutputStream(outChannel);
+ IOUtils.copyLarge(dataStream, outputStream, getCopyBuffer());
+ outputStream.flush();
+ ioManager.sync(fHandle, true);
+ } finally {
+ ioManager.close(fHandle);
+ }
+ }
+
+ private byte[] getCopyBuffer() {
+ if (copyBuffer == null) {
+ copyBuffer = new byte[4096];
+ }
+ return copyBuffer;
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryUndeployOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryUndeployOperatorDescriptor.java
new file mode 100644
index 0000000..a146ee3
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryUndeployOperatorDescriptor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.operators;
+
+import java.io.IOException;
+
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class LibraryUndeployOperatorDescriptor extends AbstractLibraryOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER = LogManager.getLogger(LibraryUndeployOperatorDescriptor.class);
+
+ public LibraryUndeployOperatorDescriptor(IOperatorDescriptorRegistry spec, DataverseName dataverseName,
+ String libraryName) {
+ super(spec, dataverseName, libraryName);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new AbstractLibraryNodePushable(ctx) {
+ @Override
+ protected void execute() throws IOException {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Undeploying library {}.{}", dataverseName, libraryName);
+ }
+
+ try {
+ // #. close the library (close its open files if any)
+ closeLibrary();
+ } finally {
+ // #. drop the whole library dir
+ FileReference libDir = getLibraryDir();
+ dropIfExists(libDir);
+ flushDirectory(libDir.getFile().getParentFile());
+ }
+ }
+ };
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
index 96f06f4..a4fdcfb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -30,7 +30,7 @@
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -44,7 +44,7 @@
}
// get adapter factory. this method has the side effect of modifying the configuration as necessary
- public static ITypedAdapterFactory getAdapterFactory(IServiceContext serviceCtx, String adapterName,
+ public static ITypedAdapterFactory getAdapterFactory(ICCServiceContext serviceCtx, String adapterName,
Map<String, String> configuration, ARecordType itemType, ARecordType metaType,
IWarningCollector warningCollector) throws HyracksDataException, AlgebricksException {
ExternalDataUtils.defaultConfiguration(configuration);
@@ -59,7 +59,7 @@
}
// get indexing adapter factory. this method has the side effect of modifying the configuration as necessary
- public static IIndexingAdapterFactory getIndexingAdapterFactory(IServiceContext serviceCtx, String adapterName,
+ public static IIndexingAdapterFactory getIndexingAdapterFactory(ICCServiceContext serviceCtx, String adapterName,
Map<String, String> configuration, ARecordType itemType, List<ExternalFile> snapshot, boolean indexingOp,
ARecordType metaType, IWarningCollector warningCollector) throws HyracksDataException, AlgebricksException {
ExternalDataUtils.defaultConfiguration(configuration);
@@ -73,7 +73,7 @@
}
// Lookup Adapters
- public static LookupAdapterFactory<?> getLookupAdapterFactory(IServiceContext serviceCtx,
+ public static LookupAdapterFactory<?> getLookupAdapterFactory(ICCServiceContext serviceCtx,
Map<String, String> configuration, ARecordType recordType, int[] ridFields, boolean retainInput,
boolean retainMissing, IMissingWriterFactory missingWriterFactory, IWarningCollector warningCollector)
throws HyracksDataException, AlgebricksException {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index 5650759..8fa0aca 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -31,8 +31,6 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
import org.apache.asterix.external.api.IInputStreamFactory;
@@ -42,6 +40,7 @@
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class DatasourceFactoryProvider {
@@ -52,26 +51,27 @@
private DatasourceFactoryProvider() {
}
- public static IExternalDataSourceFactory getExternalDataSourceFactory(ILibraryManager libraryManager,
- Map<String, String> configuration) throws HyracksDataException, AsterixException {
+ public static IExternalDataSourceFactory getExternalDataSourceFactory(Map<String, String> configuration)
+ throws HyracksDataException, AsterixException {
// Take a copy of the configuration
if (ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.RECORDS)) {
String reader = configuration.get(ExternalDataConstants.KEY_READER);
- return DatasourceFactoryProvider.getRecordReaderFactory(libraryManager, reader, configuration);
+ return DatasourceFactoryProvider.getRecordReaderFactory(reader, configuration);
} else {
// get stream source
String streamSource = configuration.get(ExternalDataConstants.KEY_STREAM_SOURCE);
- return DatasourceFactoryProvider.getInputStreamFactory(libraryManager, streamSource, configuration);
+ return DatasourceFactoryProvider.getInputStreamFactory(streamSource, configuration);
}
}
- public static IInputStreamFactory getInputStreamFactory(ILibraryManager libraryManager, String streamSource,
- Map<String, String> configuration) throws HyracksDataException {
+ public static IInputStreamFactory getInputStreamFactory(String streamSource, Map<String, String> configuration)
+ throws HyracksDataException {
IInputStreamFactory streamSourceFactory;
if (ExternalDataUtils.isExternal(streamSource)) {
- DataverseName dataverse = ExternalDataUtils.getDataverse(configuration);
- streamSourceFactory =
- ExternalDataUtils.createExternalInputStreamFactory(libraryManager, dataverse, streamSource);
+ //DataverseName dataverse = ExternalDataUtils.getDataverse(configuration);
+ //streamSourceFactory =
+ // ExternalDataUtils.createExternalInputStreamFactory(libraryManager, dataverse, streamSource);
+ throw new NotImplementedException();
} else {
switch (streamSource) {
case ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS:
@@ -104,10 +104,11 @@
}
}
- public static IRecordReaderFactory getRecordReaderFactory(ILibraryManager libraryManager, String adaptorName,
- Map<String, String> configuration) throws HyracksDataException, AsterixException {
+ public static IRecordReaderFactory getRecordReaderFactory(String adaptorName, Map<String, String> configuration)
+ throws HyracksDataException, AsterixException {
if (adaptorName.equals(ExternalDataConstants.EXTERNAL)) {
- return ExternalDataUtils.createExternalRecordReaderFactory(libraryManager, configuration);
+ //return ExternalDataUtils.createExternalRecordReaderFactory(libraryManager, configuration);
+ throw new NotImplementedException();
}
if (factories == null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index 2265a25..4c10339 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -29,11 +29,11 @@
import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.external.api.IDataParserFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
public class ParserFactoryProvider {
@@ -43,13 +43,13 @@
private ParserFactoryProvider() {
}
- public static IDataParserFactory getDataParserFactory(ILibraryManager libraryManager,
- Map<String, String> configuration) throws AsterixException {
+ public static IDataParserFactory getDataParserFactory(Map<String, String> configuration) throws AsterixException {
IDataParserFactory parserFactory;
String parserFactoryName = configuration.get(ExternalDataConstants.KEY_PARSER);
if (ExternalDataUtils.isExternal(parserFactoryName)) {
- return ExternalDataUtils.createExternalParserFactory(libraryManager,
- ExternalDataUtils.getDataverse(configuration), parserFactoryName);
+ //return ExternalDataUtils.createExternalParserFactory(libraryManager,
+ // ExternalDataUtils.getDataverse(configuration), parserFactoryName);
+ throw new NotImplementedException();
} else {
String parserFactoryKey = ExternalDataUtils.getParserFactory(configuration);
parserFactory = ParserFactoryProvider.getDataParserFactory(parserFactoryKey);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 052a8e0..b70c6ad2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -222,6 +222,7 @@
return value == null ? false : Boolean.valueOf(value);
}
+ // Currently not used.
public static IRecordReaderFactory<?> createExternalRecordReaderFactory(ILibraryManager libraryManager,
Map<String, String> configuration) throws AsterixException {
String readerFactory = configuration.get(ExternalDataConstants.KEY_READER_FACTORY);
@@ -234,14 +235,19 @@
throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER_FACTORY
+ " must follow the format \"DataverseName.LibraryName#ReaderFactoryFullyQualifiedName\"");
}
- String[] dataverseAndLibrary = libraryAndFactory[0].split(".");
+ String[] dataverseAndLibrary = libraryAndFactory[0].split("\\.");
if (dataverseAndLibrary.length != 2) {
throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER_FACTORY
+ " must follow the format \"DataverseName.LibraryName#ReaderFactoryFullyQualifiedName\"");
}
DataverseName dataverseName = DataverseName.createSinglePartName(dataverseAndLibrary[0]); //TODO(MULTI_PART_DATAVERSE_NAME):REVISIT
String libraryName = dataverseAndLibrary[1];
- ILibrary lib = libraryManager.getLibrary(dataverseName, libraryName);
+ ILibrary lib;
+ try {
+ lib = libraryManager.getLibrary(dataverseName, libraryName);
+ } catch (HyracksDataException e) {
+ throw new AsterixException("Cannot load library", e);
+ }
if (lib.getLanguage() != ExternalFunctionLanguage.JAVA) {
throw new AsterixException("Unexpected library language: " + lib.getLanguage());
}
@@ -253,12 +259,18 @@
}
}
+ // Currently not used.
public static IDataParserFactory createExternalParserFactory(ILibraryManager libraryManager,
DataverseName dataverse, String parserFactoryName) throws AsterixException {
try {
String library = parserFactoryName.substring(0,
parserFactoryName.indexOf(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR));
- ILibrary lib = libraryManager.getLibrary(dataverse, library);
+ ILibrary lib;
+ try {
+ lib = libraryManager.getLibrary(dataverse, library);
+ } catch (HyracksDataException e) {
+ throw new AsterixException("Cannot load library", e);
+ }
if (lib.getLanguage() != ExternalFunctionLanguage.JAVA) {
throw new AsterixException("Unexpected library language: " + lib.getLanguage());
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 0bd601b..f36e35b 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -24,16 +24,13 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.external.IDataSourceAdapter;
-import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.dataflow.TupleForwarder;
import org.apache.asterix.external.parser.ADMDataParser;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -52,20 +49,14 @@
private Map<String, String> configuration;
- private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
-
- private transient IServiceContext serviceContext;
-
@Override
public String getAlias() {
return "test_typed";
}
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
- clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(
- (ICcApplicationContext) serviceContext.getApplicationContext(), clusterLocations, 1);
- return clusterLocations;
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ throw new IllegalStateException(); // shouldn't be called for external adapters
}
@Override
@@ -114,9 +105,8 @@
}
@Override
- public void configure(IServiceContext serviceContext, Map<String, String> configuration,
+ public void configure(ICCServiceContext serviceContext, Map<String, String> configuration,
IWarningCollector warningCollector) {
- this.serviceContext = serviceContext;
this.configuration = configuration;
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java
index 42f1f99..dbcfe0a 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/factory/TestRecordWithPKParserFactory.java
@@ -54,8 +54,7 @@
format = configuration.get(ExternalDataConstants.KEY_RECORD_FORMAT);
parserFormats.add(format);
parserConf.put(ExternalDataConstants.KEY_FORMAT, format);
- recordParserFactory =
- (IRecordDataParserFactory<char[]>) ParserFactoryProvider.getDataParserFactory(null, parserConf);
+ recordParserFactory = (IRecordDataParserFactory<char[]>) ParserFactoryProvider.getDataParserFactory(parserConf);
recordParserFactory.setRecordType(recordType);
recordParserFactory.configure(configuration);
}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
index ba6e7b2..1e3e68f 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
@@ -79,8 +79,10 @@
DROP_FEED_POLICY,
CREATE_FUNCTION,
CREATE_ADAPTER,
+ CREATE_LIBRARY,
FUNCTION_DROP,
ADAPTER_DROP,
+ LIBRARY_DROP,
CREATE_SYNONYM,
SYNONYM_DROP,
COMPACT,
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateLibraryStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateLibraryStatement.java
new file mode 100644
index 0000000..ebdeeef0
--- /dev/null
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateLibraryStatement.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.lang.common.statement;
+
+import java.net.URI;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.functions.ExternalFunctionLanguage;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.lang.common.base.AbstractStatement;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+
+public final class CreateLibraryStatement extends AbstractStatement {
+
+ private final DataverseName dataverseName;
+ private final String libraryName;
+ private final ExternalFunctionLanguage lang;
+ private final URI location;
+ private final boolean replaceIfExists;
+ private final String authToken;
+
+ public CreateLibraryStatement(DataverseName dataverseName, String libraryName, ExternalFunctionLanguage lang,
+ URI location, boolean replaceIfExists, String authToken) {
+ this.dataverseName = dataverseName;
+ this.libraryName = libraryName;
+ this.lang = lang;
+ this.location = location;
+ this.replaceIfExists = replaceIfExists;
+ this.authToken = authToken;
+ }
+
+ public DataverseName getDataverseName() {
+ return dataverseName;
+ }
+
+ public String getLibraryName() {
+ return libraryName;
+ }
+
+ public ExternalFunctionLanguage getLang() {
+ return lang;
+ }
+
+ public URI getLocation() {
+ return location;
+ }
+
+ public boolean getReplaceIfExists() {
+ return replaceIfExists;
+ }
+
+ public String getAuthToken() {
+ return authToken;
+ }
+
+ @Override
+ public Kind getKind() {
+ return Kind.CREATE_LIBRARY;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Category.DDL;
+ }
+
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
+ return visitor.visit(this, arg);
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/LibraryDropStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/LibraryDropStatement.java
new file mode 100644
index 0000000..09ea3a8
--- /dev/null
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/LibraryDropStatement.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.lang.common.statement;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.lang.common.base.AbstractStatement;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+
+public final class LibraryDropStatement extends AbstractStatement {
+
+ private final DataverseName dataverseName;
+ private final String libraryName;
+
+ public LibraryDropStatement(DataverseName dataverseName, String libraryName) {
+ this.dataverseName = dataverseName;
+ this.libraryName = libraryName;
+ }
+
+ public DataverseName getDataverseName() {
+ return dataverseName;
+ }
+
+ public String getLibraryName() {
+ return libraryName;
+ }
+
+ @Override
+ public Kind getKind() {
+ return Kind.LIBRARY_DROP;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Category.DDL;
+ }
+
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
+ return visitor.visit(this, arg);
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
index 47e934e..62763d1 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
@@ -72,6 +72,7 @@
import org.apache.asterix.lang.common.statement.CreateFeedStatement;
import org.apache.asterix.lang.common.statement.CreateFunctionStatement;
import org.apache.asterix.lang.common.statement.CreateIndexStatement;
+import org.apache.asterix.lang.common.statement.CreateLibraryStatement;
import org.apache.asterix.lang.common.statement.CreateSynonymStatement;
import org.apache.asterix.lang.common.statement.DatasetDecl;
import org.apache.asterix.lang.common.statement.DataverseDecl;
@@ -87,6 +88,7 @@
import org.apache.asterix.lang.common.statement.IndexDropStatement;
import org.apache.asterix.lang.common.statement.InsertStatement;
import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
+import org.apache.asterix.lang.common.statement.LibraryDropStatement;
import org.apache.asterix.lang.common.statement.LoadStatement;
import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
import org.apache.asterix.lang.common.statement.NodegroupDecl;
@@ -853,6 +855,18 @@
}
@Override
+ public Void visit(CreateLibraryStatement cls, Integer arg) throws CompilationException {
+ // this statement is internal
+ return null;
+ }
+
+ @Override
+ public Void visit(LibraryDropStatement del, Integer arg) throws CompilationException {
+ // this statement is internal
+ return null;
+ }
+
+ @Override
public Void visit(ListSliceExpression expression, Integer step) throws CompilationException {
out.println(skip(step) + "ListSliceExpression [");
expression.getExpr().accept(this, step + 1);
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
index f6b2f8d..9f4571d 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
@@ -33,6 +33,7 @@
import org.apache.asterix.lang.common.statement.CreateFeedStatement;
import org.apache.asterix.lang.common.statement.CreateFunctionStatement;
import org.apache.asterix.lang.common.statement.CreateIndexStatement;
+import org.apache.asterix.lang.common.statement.CreateLibraryStatement;
import org.apache.asterix.lang.common.statement.CreateSynonymStatement;
import org.apache.asterix.lang.common.statement.DatasetDecl;
import org.apache.asterix.lang.common.statement.DataverseDecl;
@@ -46,6 +47,7 @@
import org.apache.asterix.lang.common.statement.FunctionDropStatement;
import org.apache.asterix.lang.common.statement.IndexDropStatement;
import org.apache.asterix.lang.common.statement.InsertStatement;
+import org.apache.asterix.lang.common.statement.LibraryDropStatement;
import org.apache.asterix.lang.common.statement.LoadStatement;
import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
import org.apache.asterix.lang.common.statement.NodegroupDecl;
@@ -255,4 +257,14 @@
public R visit(SynonymDropStatement del, T arg) throws CompilationException {
return null;
}
+
+ @Override
+ public R visit(CreateLibraryStatement cls, T arg) throws CompilationException {
+ return null;
+ }
+
+ @Override
+ public R visit(LibraryDropStatement del, T arg) throws CompilationException {
+ return null;
+ }
}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
index 5a9d68f..2957509 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
@@ -50,6 +50,7 @@
import org.apache.asterix.lang.common.statement.CreateFeedStatement;
import org.apache.asterix.lang.common.statement.CreateFunctionStatement;
import org.apache.asterix.lang.common.statement.CreateIndexStatement;
+import org.apache.asterix.lang.common.statement.CreateLibraryStatement;
import org.apache.asterix.lang.common.statement.CreateSynonymStatement;
import org.apache.asterix.lang.common.statement.DatasetDecl;
import org.apache.asterix.lang.common.statement.DataverseDecl;
@@ -63,6 +64,7 @@
import org.apache.asterix.lang.common.statement.FunctionDropStatement;
import org.apache.asterix.lang.common.statement.IndexDropStatement;
import org.apache.asterix.lang.common.statement.InsertStatement;
+import org.apache.asterix.lang.common.statement.LibraryDropStatement;
import org.apache.asterix.lang.common.statement.LoadStatement;
import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
import org.apache.asterix.lang.common.statement.NodegroupDecl;
@@ -182,6 +184,10 @@
R visit(AdapterDropStatement del, T arg) throws CompilationException;
+ R visit(CreateLibraryStatement cls, T arg) throws CompilationException;
+
+ R visit(LibraryDropStatement del, T arg) throws CompilationException;
+
R visit(CreateSynonymStatement css, T arg) throws CompilationException;
R visit(SynonymDropStatement del, T arg) throws CompilationException;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index e06e22f..7f50cab 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -962,6 +962,18 @@
}
@Override
+ public void updateLibrary(MetadataTransactionContext ctx, Library library) throws AlgebricksException {
+ try {
+ metadataNode.updateLibrary(ctx.getTxnId(), library);
+ } catch (RemoteException e) {
+ throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
+ }
+ // reflect the library into the cache
+ ctx.dropLibrary(library.getDataverseName(), library.getName());
+ ctx.addLibrary(library);
+ }
+
+ @Override
public <T extends IExtensionMetadataEntity> void addEntity(MetadataTransactionContext mdTxnCtx, T entity)
throws AlgebricksException {
try {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index ca98328..6fd6376 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -525,73 +525,63 @@
dropSynonym(txnId, dataverseName, synonym.getSynonymName());
}
- // As a side effect, acquires an S lock on the 'Function' dataset
- // on behalf of txnId.
- List<Function> dataverseFunctions = getDataverseFunctions(txnId, dataverseName);
- // Drop all functions in this dataverse.
- for (Function function : dataverseFunctions) {
- dropFunction(txnId, new FunctionSignature(dataverseName, function.getName(), function.getArity()),
- true);
- }
-
- //Drop libraries, similarly.
- for (Library lib : getDataverseLibraries(txnId, dataverseName)) {
- dropLibrary(txnId, lib.getDataverseName(), lib.getName());
- }
-
- List<Dataset> dataverseDatasets;
- Dataset ds;
- dataverseDatasets = getDataverseDatasets(txnId, dataverseName);
- // Drop all datasets in this dataverse.
- for (int i = 0; i < dataverseDatasets.size(); i++) {
- ds = dataverseDatasets.get(i);
- dropDataset(txnId, dataverseName, ds.getDatasetName(), true);
- }
-
- // After dropping datasets, drop datatypes
- List<Datatype> dataverseDatatypes;
- // As a side effect, acquires an S lock on the 'datatype' dataset
- // on behalf of txnId.
- dataverseDatatypes = getDataverseDatatypes(txnId, dataverseName);
- // Drop all types in this dataverse.
- for (int i = 0; i < dataverseDatatypes.size(); i++) {
- forceDropDatatype(txnId, dataverseName, dataverseDatatypes.get(i).getDatatypeName());
- }
-
- // As a side effect, acquires an S lock on the 'Adapter' dataset
- // on behalf of txnId.
- List<DatasourceAdapter> dataverseAdapters = getDataverseAdapters(txnId, dataverseName);
- // Drop all functions in this dataverse.
- for (DatasourceAdapter adapter : dataverseAdapters) {
- dropAdapter(txnId, dataverseName, adapter.getAdapterIdentifier().getName());
- }
-
- List<Feed> dataverseFeeds;
- List<FeedConnection> feedConnections;
- Feed feed;
- dataverseFeeds = getDataverseFeeds(txnId, dataverseName);
- // Drop all feeds&connections in this dataverse.
- for (int i = 0; i < dataverseFeeds.size(); i++) {
- feed = dataverseFeeds.get(i);
- feedConnections = getFeedConnections(txnId, dataverseName, feed.getFeedName());
+ // Drop all feeds and connections in this dataverse.
+ // Feeds may depend on datatypes and adapters
+ List<Feed> dataverseFeeds = getDataverseFeeds(txnId, dataverseName);
+ for (Feed feed : dataverseFeeds) {
+ List<FeedConnection> feedConnections = getFeedConnections(txnId, dataverseName, feed.getFeedName());
for (FeedConnection feedConnection : feedConnections) {
dropFeedConnection(txnId, dataverseName, feed.getFeedName(), feedConnection.getDatasetName());
}
dropFeed(txnId, dataverseName, feed.getFeedName());
}
+ // Drop all feed ingestion policies in this dataverse.
List<FeedPolicyEntity> feedPolicies = getDataverseFeedPolicies(txnId, dataverseName);
- if (feedPolicies != null && !feedPolicies.isEmpty()) {
- // Drop all feed ingestion policies in this dataverse.
- for (FeedPolicyEntity feedPolicy : feedPolicies) {
- dropFeedPolicy(txnId, dataverseName, feedPolicy.getPolicyName());
- }
+ for (FeedPolicyEntity feedPolicy : feedPolicies) {
+ dropFeedPolicy(txnId, dataverseName, feedPolicy.getPolicyName());
+ }
+
+ // Drop all functions in this dataverse.
+ // Functions may depend on datatypes and libraries
+ // As a side effect, acquires an S lock on the 'Function' dataset on behalf of txnId.
+ List<Function> dataverseFunctions = getDataverseFunctions(txnId, dataverseName);
+ for (Function function : dataverseFunctions) {
+ dropFunction(txnId, new FunctionSignature(dataverseName, function.getName(), function.getArity()),
+ true);
+ }
+
+ // Drop all adapters in this dataverse.
+ // Adapters depend on libraries.
+ // As a side effect, acquires an S lock on the 'Adapter' dataset on behalf of txnId.
+ List<DatasourceAdapter> dataverseAdapters = getDataverseAdapters(txnId, dataverseName);
+ for (DatasourceAdapter adapter : dataverseAdapters) {
+ dropAdapter(txnId, dataverseName, adapter.getAdapterIdentifier().getName());
+ }
+
+ // Drop all libraries in this dataverse.
+ List<Library> dataverseLibraries = getDataverseLibraries(txnId, dataverseName);
+ for (Library lib : dataverseLibraries) {
+ dropLibrary(txnId, lib.getDataverseName(), lib.getName());
+ }
+
+ // Drop all datasets and indexes in this dataverse.
+ // Datasets depend on datatypes
+ List<Dataset> dataverseDatasets = getDataverseDatasets(txnId, dataverseName);
+ for (Dataset ds : dataverseDatasets) {
+ dropDataset(txnId, dataverseName, ds.getDatasetName(), true);
+ }
+
+ // Drop all types in this dataverse.
+ // As a side effect, acquires an S lock on the 'datatype' dataset on behalf of txnId.
+ List<Datatype> dataverseDatatypes = getDataverseDatatypes(txnId, dataverseName);
+ for (Datatype dataverseDatatype : dataverseDatatypes) {
+ forceDropDatatype(txnId, dataverseName, dataverseDatatype.getDatatypeName());
}
// Delete the dataverse entry from the 'dataverse' dataset.
+ // As a side effect, acquires an S lock on the 'dataverse' dataset on behalf of txnId.
ITupleReference searchKey = createTuple(dataverseName);
- // As a side effect, acquires an S lock on the 'dataverse' dataset
- // on behalf of txnId.
ITupleReference tuple = getTupleToBeDeleted(txnId, MetadataPrimaryIndexes.DATAVERSE_DATASET, searchKey);
deleteTupleFromIndex(txnId, MetadataPrimaryIndexes.DATAVERSE_DATASET, tuple);
} catch (HyracksDataException e) {
@@ -911,6 +901,19 @@
}
}
+ public List<DatasourceAdapter> getAllAdapters(TxnId txnId) throws AlgebricksException {
+ try {
+ DatasourceAdapterTupleTranslator tupleReaderWriter =
+ tupleTranslatorProvider.getAdapterTupleTranslator(false);
+ List<DatasourceAdapter> results = new ArrayList<>();
+ IValueExtractor<DatasourceAdapter> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
+ searchIndex(txnId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, null, valueExtractor, results);
+ return results;
+ } catch (HyracksDataException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
private void confirmDataverseCanBeDeleted(TxnId txnId, DataverseName dataverseName) throws AlgebricksException {
// If a dataset from a DIFFERENT dataverse
// uses a type from this dataverse
@@ -991,6 +994,37 @@
}
}
+ private void confirmLibraryCanBeDeleted(TxnId txnId, DataverseName dataverseName, String libraryName)
+ throws AlgebricksException {
+ confirmLibraryIsUnusedByFunctions(txnId, dataverseName, libraryName);
+ confirmLibraryIsUnusedByAdapters(txnId, dataverseName, libraryName);
+ }
+
+ private void confirmLibraryIsUnusedByFunctions(TxnId txnId, DataverseName dataverseName, String libraryName)
+ throws AlgebricksException {
+ List<Function> functions = getAllFunctions(txnId);
+ for (Function function : functions) {
+ if (libraryName.equals(function.getLibrary()) && dataverseName.equals(function.getDataverseName())) {
+ throw new AlgebricksException(
+ "Cannot drop library " + dataverseName + '.' + libraryName + " being used by funciton "
+ + function.getDataverseName() + '.' + function.getName() + '@' + function.getArity());
+ }
+ }
+ }
+
+ private void confirmLibraryIsUnusedByAdapters(TxnId txnId, DataverseName dataverseName, String libraryName)
+ throws AlgebricksException {
+ List<DatasourceAdapter> adapters = getAllAdapters(txnId);
+ for (DatasourceAdapter adapter : adapters) {
+ if (libraryName.equals(adapter.getLibraryName())
+ && adapter.getAdapterIdentifier().getDataverseName().equals(dataverseName)) {
+ throw new AlgebricksException("Cannot drop library " + dataverseName + '.' + libraryName
+ + " being used by adapter " + adapter.getAdapterIdentifier().getDataverseName() + '.'
+ + adapter.getAdapterIdentifier().getName());
+ }
+ }
+ }
+
private void confirmDatatypeIsUnused(TxnId txnId, DataverseName dataverseName, String datatypeName)
throws AlgebricksException {
confirmDatatypeIsUnusedByDatatypes(txnId, dataverseName, datatypeName);
@@ -1582,10 +1616,8 @@
@Override
public void dropLibrary(TxnId txnId, DataverseName dataverseName, String libraryName) throws AlgebricksException {
- Library library = getLibrary(txnId, dataverseName, libraryName);
- if (library == null) {
- throw new AlgebricksException("Cannot drop library '" + library + "' because it doesn't exist.");
- }
+ confirmLibraryCanBeDeleted(txnId, dataverseName, libraryName);
+
try {
// Delete entry from the 'Library' dataset.
ITupleReference searchKey = createTuple(dataverseName, libraryName);
@@ -1597,7 +1629,7 @@
} catch (HyracksDataException e) {
if (e.getComponent().equals(ErrorCode.HYRACKS)
&& e.getErrorCode() == ErrorCode.UPDATE_OR_DELETE_NON_EXISTENT_KEY) {
- throw new AlgebricksException("Cannot drop library '" + libraryName, e);
+ throw new AlgebricksException("Cannot drop library '" + libraryName + "' because it doesn't exist", e);
} else {
throw new AlgebricksException(e);
}
@@ -2026,6 +2058,27 @@
}
}
+ @Override
+ public void updateLibrary(TxnId txnId, Library library) throws AlgebricksException {
+ try {
+ // This method will delete previous entry of the library and insert the new one
+ // Delete entry from the 'library' dataset.
+ ITupleReference searchKey = createTuple(library.getDataverseName(), library.getName());
+ // Searches the index for the tuple to be deleted. Acquires an S
+ // lock on the 'library' dataset.
+ ITupleReference libraryTuple =
+ getTupleToBeDeleted(txnId, MetadataPrimaryIndexes.LIBRARY_DATASET, searchKey);
+ deleteTupleFromIndex(txnId, MetadataPrimaryIndexes.LIBRARY_DATASET, libraryTuple);
+ // Previous tuple was deleted
+ // Insert into the 'library' dataset.
+ LibraryTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getLibraryTupleTranslator(true);
+ libraryTuple = tupleReaderWriter.getTupleFromMetadataEntity(library);
+ insertTupleIntoIndex(txnId, MetadataPrimaryIndexes.LIBRARY_DATASET, libraryTuple);
+ } catch (HyracksDataException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
public ITxnIdFactory getTxnIdFactory() {
return txnIdFactory;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
index 0de138f..90fe3a5 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
@@ -171,7 +171,7 @@
}
public void dropLibrary(DataverseName dataverseName, String libraryName) {
- Library library = new Library(dataverseName, libraryName);
+ Library library = new Library(dataverseName, libraryName, null, MetadataUtil.PENDING_NO_OP);
droppedCache.addLibraryIfNotExists(library);
logAndApply(new MetadataLogicalOperation(library, false));
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index fbacac1..09a4a94 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -703,6 +703,16 @@
void updateDataset(MetadataTransactionContext ctx, Dataset dataset) throws AlgebricksException;
/**
+ * update an existing library in metadata.
+ *
+ * @param ctx
+ * MetadataTransactionContext of an active metadata transaction.
+ * @param library
+ * Existing Library.
+ */
+ void updateLibrary(MetadataTransactionContext ctx, Library library) throws AlgebricksException;
+
+ /**
* Add an extension entity to its extension dataset under the ongoing metadata
* transaction
*
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index 8a052b7..0330008 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -803,6 +803,17 @@
void updateDataset(TxnId txnId, Dataset dataset) throws AlgebricksException, RemoteException;
/**
+ * update an existing library in the metadata, acquiring local locks on behalf
+ * of the given transaction id.
+ *
+ * @param txnId
+ * A globally unique id for an active metadata transaction.
+ * @param library
+ * updated Library instance.
+ */
+ void updateLibrary(TxnId txnId, Library library) throws AlgebricksException, RemoteException;
+
+ /**
* Adds an extension entity under the ongoing transaction job id
*
* @param txnId
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 1962559..a1d6743 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -75,7 +75,7 @@
public static final String FIELD_NAME_IS_PRIMARY = "IsPrimary";
public static final String FIELD_NAME_KIND = "Kind";
public static final String FIELD_NAME_LANGUAGE = "Language";
- public static final String FIELD_NAME_LIBRARY = "Library";
+ public static final String FIELD_NAME_LIBRARY_NAME = "LibraryName";
public static final String FIELD_NAME_LAST_REFRESH_TIME = "LastRefreshTime";
public static final String FIELD_NAME_METATYPE_DATAVERSE_NAME = "MetatypeDataverseName";
public static final String FIELD_NAME_METATYPE_NAME = "MetatypeName";
@@ -370,8 +370,6 @@
public static final int DATASOURCE_ADAPTER_ARECORD_TIMESTAMP_FIELD_INDEX = 4;
//open types
- public static final String DATASOURCE_ARECORD_FUNCTION_LIBRARY_FIELD_NAME = "Library";
-
public static final ARecordType DATASOURCE_ADAPTER_RECORDTYPE = createRecordType(
// RecordTypeName
RECORD_NAME_DATASOURCE_ADAPTER,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 3f5d0b0..8597a21 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -50,6 +50,7 @@
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
import org.apache.asterix.dataflow.data.nontagged.serde.SerializerDeserializerUtil;
+import org.apache.asterix.external.adapter.factory.ExternalAdapterFactory;
import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
@@ -62,7 +63,6 @@
import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
import org.apache.asterix.external.provider.AdapterFactoryProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.FeedConstants;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
@@ -488,10 +488,10 @@
policyAccessor, factoryOutput.second);
break;
case EXTERNAL:
- String libraryName = feed.getConfiguration().get(ExternalDataConstants.KEY_ADAPTER_NAME).trim()
- .split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
- feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feed, libraryName,
- adapterFactory.getClass().getName(), recordType, policyAccessor, factoryOutput.second);
+ ExternalAdapterFactory extAdapterFactory = (ExternalAdapterFactory) adapterFactory;
+ feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feed, extAdapterFactory.getLibraryDataverse(),
+ extAdapterFactory.getLibraryName(), extAdapterFactory.getClassName(), recordType,
+ policyAccessor, factoryOutput.second);
break;
default:
break;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
index f1a8f60..5e0ece3 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
@@ -25,25 +25,27 @@
public class DatasourceAdapter implements IMetadataEntity<DatasourceAdapter> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final AdapterIdentifier adapterIdentifier;
private final String classname;
private final AdapterType type;
- private final String library;
+ //TODO:also need libraryDataverse
+ private final String libraryName;
public DatasourceAdapter(AdapterIdentifier adapterIdentifier, String classname, AdapterType type) {
this.adapterIdentifier = adapterIdentifier;
this.classname = classname;
this.type = type;
- this.library = null;
+ this.libraryName = null;
}
- public DatasourceAdapter(AdapterIdentifier adapterIdentifier, String classname, AdapterType type, String library) {
+ public DatasourceAdapter(AdapterIdentifier adapterIdentifier, String classname, AdapterType type,
+ String libraryName) {
this.adapterIdentifier = adapterIdentifier;
this.classname = classname;
this.type = type;
- this.library = library;
+ this.libraryName = libraryName;
}
@Override
@@ -68,8 +70,7 @@
return type;
}
- public String getLibrary() {
- return library;
+ public String getLibraryName() {
+ return libraryName;
}
-
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Library.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Library.java
index 87ead6a..494b5f1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Library.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Library.java
@@ -28,10 +28,14 @@
private final DataverseName dataverse;
private final String name;
+ private final String language;
+ private final int pendingOp;
- public Library(DataverseName dataverseName, String libraryName) {
+ public Library(DataverseName dataverseName, String libraryName, String language, int pendingOp) {
this.dataverse = dataverseName;
this.name = libraryName;
+ this.language = language;
+ this.pendingOp = pendingOp;
}
public DataverseName getDataverseName() {
@@ -42,6 +46,14 @@
return name;
}
+ public String getLanguage() {
+ return language;
+ }
+
+ public int getPendingOp() {
+ return pendingOp;
+ }
+
@Override
public Library addToCache(MetadataCache cache) {
return cache.addLibraryIfNotExists(this);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
index eb37a60..6279451 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
@@ -19,8 +19,6 @@
package org.apache.asterix.metadata.entitytupletranslators;
-import static org.apache.asterix.metadata.bootstrap.MetadataRecordTypes.DATASOURCE_ARECORD_FUNCTION_LIBRARY_FIELD_NAME;
-
import java.util.Calendar;
import org.apache.asterix.common.external.IDataSourceAdapter;
@@ -31,7 +29,6 @@
import org.apache.asterix.metadata.entities.DatasourceAdapter;
import org.apache.asterix.om.base.ARecord;
import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -59,17 +56,12 @@
((AString) adapterRecord.getValueByPos(MetadataRecordTypes.DATASOURCE_ADAPTER_ARECORD_TYPE_FIELD_INDEX))
.getStringValue());
- String library = getAdapterLibrary(adapterRecord);
+ int libraryNameIdx = adapterRecord.getType().getFieldIndex(MetadataRecordTypes.FIELD_NAME_LIBRARY_NAME);
+ String libraryName =
+ libraryNameIdx >= 0 ? ((AString) adapterRecord.getValueByPos(libraryNameIdx)).getStringValue() : null;
return new DatasourceAdapter(new AdapterIdentifier(dataverseName, adapterName), classname, adapterType,
- library);
- }
-
- private String getAdapterLibrary(ARecord adapterRecord) {
- final ARecordType adapterType = adapterRecord.getType();
- final int adapterLibraryIdx = adapterType.getFieldIndex(DATASOURCE_ARECORD_FUNCTION_LIBRARY_FIELD_NAME);
- return adapterLibraryIdx >= 0 ? ((AString) adapterRecord.getValueByPos(adapterLibraryIdx)).getStringValue()
- : null;
+ libraryName);
}
@Override
@@ -121,30 +113,30 @@
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
+ // write open fields
+ writeOpenFields(adapter);
+
// write record
recordBuilder.write(tupleBuilder.getDataOutput(), true);
tupleBuilder.addFieldEndOffset();
tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
-
- writeOpenTypes(adapter);
-
return tuple;
}
- void writeOpenTypes(DatasourceAdapter adapter) throws HyracksDataException {
+ void writeOpenFields(DatasourceAdapter adapter) throws HyracksDataException {
writeLibrary(adapter);
}
protected void writeLibrary(DatasourceAdapter adapter) throws HyracksDataException {
- if (null == adapter.getLibrary()) {
+ if (adapter.getLibraryName() == null) {
return;
}
fieldName.reset();
- aString.setValue(DATASOURCE_ARECORD_FUNCTION_LIBRARY_FIELD_NAME);
+ aString.setValue(MetadataRecordTypes.FIELD_NAME_LIBRARY_NAME);
stringSerde.serialize(aString, fieldName.getDataOutput());
fieldValue.reset();
- aString.setValue(adapter.getLibrary());
+ aString.setValue(adapter.getLibraryName());
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(fieldName, fieldValue);
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/LibraryTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/LibraryTupleTranslator.java
index f521001..4792398 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/LibraryTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/LibraryTupleTranslator.java
@@ -19,14 +19,21 @@
package org.apache.asterix.metadata.entitytupletranslators;
+import static org.apache.asterix.metadata.bootstrap.MetadataRecordTypes.FIELD_NAME_LANGUAGE;
+import static org.apache.asterix.metadata.bootstrap.MetadataRecordTypes.FIELD_NAME_PENDING_OP;
+
import java.util.Calendar;
+import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
import org.apache.asterix.metadata.entities.Library;
+import org.apache.asterix.metadata.utils.MetadataUtil;
+import org.apache.asterix.om.base.AInt32;
import org.apache.asterix.om.base.ARecord;
import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -52,7 +59,16 @@
((AString) libraryRecord.getValueByPos(MetadataRecordTypes.LIBRARY_ARECORD_NAME_FIELD_INDEX))
.getStringValue();
- return new Library(dataverseName, libraryName);
+ ARecordType libraryRecordType = libraryRecord.getType();
+ int pendingOpIdx = libraryRecordType.getFieldIndex(FIELD_NAME_PENDING_OP);
+ int pendingOp = pendingOpIdx >= 0 ? ((AInt32) libraryRecord.getValueByPos(pendingOpIdx)).getIntegerValue()
+ : MetadataUtil.PENDING_NO_OP;
+
+ int languageIdx = libraryRecordType.getFieldIndex(FIELD_NAME_LANGUAGE);
+ String language = languageIdx >= 0 ? ((AString) libraryRecord.getValueByPos(languageIdx)).getStringValue()
+ : ExternalFunctionLanguage.JAVA.name();
+
+ return new Library(dataverseName, libraryName, language, pendingOp);
}
@Override
@@ -90,6 +106,8 @@
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(MetadataRecordTypes.LIBRARY_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
+ writeOpenFields(library);
+
// write record
recordBuilder.write(tupleBuilder.getDataOutput(), true);
tupleBuilder.addFieldEndOffset();
@@ -97,4 +115,34 @@
tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
return tuple;
}
+
+ protected void writeOpenFields(Library library) throws HyracksDataException {
+ writeLanguage(library);
+ writePendingOp(library);
+ }
+
+ private void writeLanguage(Library library) throws HyracksDataException {
+ String language = library.getLanguage();
+
+ fieldName.reset();
+ aString.setValue(FIELD_NAME_LANGUAGE);
+ stringSerde.serialize(aString, fieldName.getDataOutput());
+ fieldValue.reset();
+ aString.setValue(language);
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(fieldName, fieldValue);
+ }
+
+ private void writePendingOp(Library library) throws HyracksDataException {
+ int pendingOp = library.getPendingOp();
+
+ if (pendingOp != MetadataUtil.PENDING_NO_OP) {
+ fieldName.reset();
+ aString.setValue(FIELD_NAME_PENDING_OP);
+ stringSerde.serialize(aString, fieldName.getDataOutput());
+ fieldValue.reset();
+ int32Serde.serialize(new AInt32(pendingOp), fieldValue.getDataOutput());
+ recordBuilder.addField(fieldName, fieldValue);
+ }
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 24301e0..466778c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -32,12 +32,11 @@
import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.common.external.IDataSourceAdapter.AdapterType;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
-import org.apache.asterix.common.library.ILibrary;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.adapter.factory.ExternalAdapterFactory;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.feed.api.IFeed;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.library.JavaLibrary;
import org.apache.asterix.external.provider.AdapterFactoryProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -50,6 +49,7 @@
import org.apache.asterix.metadata.entities.Datatype;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.FeedPolicyEntity;
+import org.apache.asterix.metadata.entities.Library;
import org.apache.asterix.metadata.utils.MetadataConstants;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
@@ -115,7 +115,7 @@
// Get adapter from metadata dataset <Metadata dataverse>
String adapterName = configuration.get(ExternalDataConstants.KEY_ADAPTER_NAME);
if (adapterName == null) {
- throw new AlgebricksException("cannot find adatper name");
+ throw new AlgebricksException("cannot find adapter name");
}
DatasourceAdapter adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx,
MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
@@ -133,14 +133,7 @@
adapterFactory = (ITypedAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
break;
case EXTERNAL:
- String[] anameComponents = adapterName.split("#");
- String libraryName = anameComponents[0];
- ILibrary lib = appCtx.getLibraryManager().getLibrary(feed.getDataverseName(), libraryName);
- if (lib.getLanguage() != ExternalFunctionLanguage.JAVA) {
- throw new HyracksDataException("Unexpected library language: " + lib.getLanguage());
- }
- ClassLoader cl = ((JavaLibrary) lib).getClassLoader();
- adapterFactory = (ITypedAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
+ adapterFactory = createExternalAdapterFactory(mdTxnCtx, adapterEntity, adapterFactoryClassname);
break;
default:
throw new AsterixException("Unknown Adapter type " + adapterType);
@@ -174,6 +167,22 @@
}
}
+ private static ITypedAdapterFactory createExternalAdapterFactory(MetadataTransactionContext mdTxnCtx,
+ DatasourceAdapter adapterEntity, String adapterFactoryClassname)
+ throws AlgebricksException, RemoteException, HyracksDataException {
+ //TODO:library dataverse must be explicitly specified in the adapter entity
+ DataverseName libraryDataverse = adapterEntity.getAdapterIdentifier().getDataverseName();
+ String libraryName = adapterEntity.getLibraryName();
+ Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, libraryDataverse, libraryName);
+ if (library == null) {
+ throw new CompilationException(ErrorCode.UNKNOWN_LIBRARY, libraryName);
+ }
+ if (!ExternalFunctionLanguage.JAVA.name().equals(library.getLanguage())) {
+ throw new HyracksDataException("Unexpected library language: " + library.getLanguage());
+ }
+ return new ExternalAdapterFactory(libraryDataverse, libraryName, adapterFactoryClassname);
+ }
+
@SuppressWarnings("rawtypes")
public static Triple<ITypedAdapterFactory, RecordDescriptor, AdapterType> getFeedFactoryAndOutput(Feed feed,
FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx, ICcApplicationContext appCtx)
@@ -209,14 +218,7 @@
adapterFactory = (ITypedAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
break;
case EXTERNAL:
- String[] anameComponents = adapterName.split("#");
- String libraryName = anameComponents[0];
- ILibrary lib = appCtx.getLibraryManager().getLibrary(feed.getDataverseName(), libraryName);
- if (lib.getLanguage() != ExternalFunctionLanguage.JAVA) {
- throw new HyracksDataException("Unexpected library language: " + lib.getLanguage());
- }
- ClassLoader cl = ((JavaLibrary) lib).getClassLoader();
- adapterFactory = (ITypedAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
+ adapterFactory = createExternalAdapterFactory(mdTxnCtx, adapterEntity, adapterFactoryClassname);
break;
default:
throw new AsterixException("Unknown Adapter type " + adapterType);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 6def41a..9099c88 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -46,7 +46,6 @@
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.external.IAdapterFactoryService;
-import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
@@ -74,7 +73,6 @@
private ICCServiceContext ccServiceCtx;
private IStorageComponentProvider storageComponentProvider;
private IGlobalRecoveryManager globalRecoveryManager;
- private ILibraryManager libraryManager;
private IResourceIdManager resourceIdManager;
private CompilerProperties compilerProperties;
private ExternalProperties externalProperties;
@@ -104,15 +102,14 @@
private final IAdapterFactoryService adapterFactoryService;
public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
- ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier,
- IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator ftStrategy,
- IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider,
- IMetadataLockManager mdLockManager, IMetadataLockUtil mdLockUtil, IReceptionistFactory receptionistFactory,
+ Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
+ INcLifecycleCoordinator ftStrategy, IJobLifecycleListener activeLifeCycleListener,
+ IStorageComponentProvider storageComponentProvider, IMetadataLockManager mdLockManager,
+ IMetadataLockUtil mdLockUtil, IReceptionistFactory receptionistFactory,
IConfigValidatorFactory configValidatorFactory, Object extensionManager,
IAdapterFactoryService adapterFactoryService) throws AlgebricksException, IOException {
this.ccServiceCtx = ccServiceCtx;
this.hcc = hcc;
- this.libraryManager = libraryManager;
this.activeLifeCycleListener = activeLifeCycleListener;
this.extensionManager = extensionManager;
// Determine whether to use old-style asterix-configuration.xml or new-style configuration.
@@ -219,11 +216,6 @@
}
@Override
- public ILibraryManager getLibraryManager() {
- return libraryManager;
- }
-
- @Override
public Object getExtensionManager() {
return extensionManager;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index e813141..0ee3658 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -386,12 +386,10 @@
private static final long serialVersionUID = 1L;
private final List<URL> binaryURLs;
private final DeploymentId deploymentId;
- private final boolean extractFromArchive;
- public CliDeployBinaryFunction(List<URL> binaryURLs, DeploymentId deploymentId, boolean isExtractFromArchive) {
+ public CliDeployBinaryFunction(List<URL> binaryURLs, DeploymentId deploymentId) {
this.binaryURLs = binaryURLs;
this.deploymentId = deploymentId;
- this.extractFromArchive = isExtractFromArchive;
}
@Override
@@ -406,10 +404,6 @@
public DeploymentId getDeploymentId() {
return deploymentId;
}
-
- public boolean isExtractFromArchive() {
- return extractFromArchive;
- }
}
public static class CliUnDeployBinaryFunction extends Function {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index f295119..1118a68 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -92,8 +92,6 @@
*
* @param jobSpec
* Job Specification
- * @param jobFlags
- * Flags
* @throws Exception
*/
DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception;
@@ -168,13 +166,12 @@
/**
* Deploy files to the cluster
- *
- * @param files
- * a list of file paths
* @param deploymentId
* the id used to uniquely identify this set of files for management
+ * @param files
+ * a list of file paths
*/
- void deployBinary(DeploymentId deploymentId, List<String> files, boolean extractFromArchive) throws Exception;
+ void deployBinary(DeploymentId deploymentId, List<String> files) throws Exception;
/**
* undeploy a certain deployment
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 053276f..4cc47d2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -57,8 +57,7 @@
public ClusterTopology getClusterTopology() throws Exception;
- public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId, boolean extractFromArchive)
- throws Exception;
+ public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception;
public void unDeployBinary(DeploymentId deploymentId) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index 962f826..75f4848 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -20,6 +20,7 @@
import java.io.Closeable;
import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -57,8 +58,12 @@
public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException;
+ public void truncate(IFileHandle fileHandle, long size) throws HyracksDataException;
+
public long getSize(IFileHandle fileHandle);
+ public WritableByteChannel newWritableChannel(IFileHandle fileHandle);
+
public void deleteWorkspaceFiles() throws HyracksDataException;
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
index 09ecb15..6ad53ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
@@ -21,9 +21,11 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -132,4 +134,22 @@
return files;
}
+ public static void flushDirectory(File directory) throws IOException {
+ flushDirectory(directory.toPath());
+ }
+
+ public static void flushDirectory(Path path) throws IOException {
+ if (!Files.isDirectory(path)) {
+ throw new IOException("Not a directory: " + path);
+ }
+ if (Files.getFileStore(path).supportsFileAttributeView("posix")) {
+ try (FileChannel ch = FileChannel.open(path, StandardOpenOption.READ)) {
+ ch.force(true);
+ }
+ } else {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Unable to flush directory " + path);
+ }
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 25474769..e6973dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -165,7 +165,7 @@
HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
(HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
ccs.getWorkQueue().schedule(new CliDeployBinaryWork(ccs, dbf.getBinaryURLs(), dbf.getDeploymentId(),
- dbf.isExtractFromArchive(), new IPCResponder<>(handle, mid)));
+ new IPCResponder<>(handle, mid)));
break;
case CLI_UNDEPLOY_BINARY:
HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf =
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/ApplicationInstallationHandler.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/ApplicationInstallationHandler.java
index f400978..18a2817 100755
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/ApplicationInstallationHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/ApplicationInstallationHandler.java
@@ -28,6 +28,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.context.ServerContext;
import org.apache.hyracks.control.common.work.SynchronizableWork;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
@@ -68,8 +69,9 @@
final String fileName = params[1];
final String rootDir = ccs.getServerContext().getBaseDir().toString();
- final String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + "applications/" + deployIdString
- : rootDir + File.separator + "/applications/" + File.separator + deployIdString;
+ final String deploymentDir = rootDir.endsWith(File.separator)
+ ? rootDir + ServerContext.APP_DIR_NAME + File.separator + deployIdString
+ : rootDir + File.separator + ServerContext.APP_DIR_NAME + File.separator + deployIdString;
final HttpMethod method = request.getHttpRequest().method();
try {
response.setStatus(HttpResponseStatus.OK);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
index 53f9360..4962607 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
@@ -47,15 +47,13 @@
private List<URL> binaryURLs;
private DeploymentId deploymentId;
private IPCResponder<DeploymentId> callback;
- private boolean extractFromArchive;
public CliDeployBinaryWork(ClusterControllerService ncs, List<URL> binaryURLs, DeploymentId deploymentId,
- boolean extractFromArchive, IPCResponder<DeploymentId> callback) {
+ IPCResponder<DeploymentId> callback) {
this.ccs = ncs;
this.binaryURLs = binaryURLs;
this.deploymentId = deploymentId;
this.callback = callback;
- this.extractFromArchive = extractFromArchive;
}
@Override
@@ -68,7 +66,7 @@
* Deploy for the cluster controller
*/
DeploymentUtils.deploy(deploymentId, binaryURLs, ccs.getContext().getJobSerializerDeserializerContainer(),
- ccs.getServerContext(), false, extractFromArchive);
+ ccs.getServerContext(), false);
/**
* Deploy for the node controllers
@@ -84,7 +82,7 @@
* deploy binaries to each node controller
*/
for (NodeControllerState ncs : nodeManager.getAllNodeControllerStates()) {
- ncs.getNodeController().deployBinary(deploymentId, binaryURLs, extractFromArchive);
+ ncs.getNodeController().deployBinary(deploymentId, binaryURLs);
}
ccs.getExecutor().execute(new Runnable() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 7f172b6..42a0d66 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -51,7 +51,7 @@
void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception;
- void deployBinary(DeploymentId deploymentId, List<URL> url, boolean extractFromArchive) throws Exception;
+ void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception;
void undeployBinary(DeploymentId deploymentId) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
index ef2777d..e1c4d59 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
@@ -24,6 +24,8 @@
public class ServerContext implements IServerContext {
+ public static final String APP_DIR_NAME = "applications";
+
private final ServerType type;
private final File baseDir;
private final File appDir;
@@ -31,7 +33,7 @@
public ServerContext(ServerType type, File baseDir) {
this.type = type;
this.baseDir = baseDir;
- this.appDir = new File(baseDir, "applications");
+ this.appDir = new File(baseDir, APP_DIR_NAME);
}
public ServerType getServerType() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java
index e0150f7..ceb0da8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java
@@ -22,20 +22,12 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Enumeration;
import java.util.List;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
@@ -48,7 +40,6 @@
import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.common.context.ServerContext;
-import org.apache.hyracks.util.file.FileUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -61,11 +52,6 @@
public static final String DEPLOYMENT = "applications";
private static final Logger LOGGER = LogManager.getLogger();
- private static final String SHIV_SUFFIX = ".pyz";
- private static final String ZIP_SUFFIX = ".zip";
- private static final String SHIV_ROOT = "SHIV_ROOT";
- private static final String SHIV_ENTRY_POINT = "SHIV_ENTRY_POINT";
- private static final String SHIV_INTERPRETER = "SHIV_INTERPRETER";
/**
* undeploy an existing deployment
@@ -102,12 +88,13 @@
* @param container
* the container of serailizer/deserializer
* @param ctx
- * the ServerContext * @param isNC
+ * the ServerContext
+ * @param isNC
* true is NC/false is CC
* @throws HyracksException
*/
public static void deploy(DeploymentId deploymentId, List<URL> urls, IJobSerializerDeserializerContainer container,
- ServerContext ctx, boolean isNC, boolean extractFromArchive) throws HyracksException {
+ ServerContext ctx, boolean isNC) throws HyracksException {
IJobSerializerDeserializer jobSerDe = container.getJobSerializerDeserializer(deploymentId);
if (jobSerDe == null) {
jobSerDe = new ClassLoaderJobSerializerDeserializer();
@@ -116,11 +103,7 @@
String rootDir = ctx.getBaseDir().toString();
String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + DEPLOYMENT + File.separator + deploymentId
: rootDir + File.separator + DEPLOYMENT + File.separator + deploymentId;
- if (extractFromArchive) {
- downloadURLs(urls, deploymentDir, isNC, true);
- } else {
- jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir, isNC, false));
- }
+ jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir, isNC));
}
/**
@@ -195,8 +178,7 @@
* @return a list of local file URLs
* @throws HyracksException
*/
- private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC,
- boolean extractFromArchive) throws HyracksException {
+ private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC) throws HyracksException {
//retry 10 times at maximum for downloading binaries
int retryCount = 10;
int tried = 0;
@@ -227,13 +209,6 @@
os.close();
}
}
- if (extractFromArchive) {
- if (targetFile.getAbsolutePath().endsWith(SHIV_SUFFIX)) {
- shiv(targetFile.getAbsolutePath(), deploymentDir);
- } else if (targetFile.getAbsolutePath().endsWith(ZIP_SUFFIX)) {
- unzip(targetFile.getAbsolutePath(), deploymentDir);
- }
- }
downloadedFileURLs.add(targetFile.toURI().toURL());
}
return downloadedFileURLs;
@@ -244,61 +219,4 @@
}
throw HyracksException.create(trace);
}
-
- public static void unzip(String sourceFile, String outputDirName) throws IOException {
- try (ZipFile zipFile = new ZipFile(sourceFile)) {
- Path outputPath = Paths.get(FilenameUtils.normalize(outputDirName));
- File outputDir = outputPath.toFile();
- if (!outputDir.exists()) {
- throw new IOException("Output path doesn't exist");
- }
- if (!outputDir.isDirectory()) {
- throw new IOException("Output path is not a directory");
- }
- Enumeration<? extends ZipEntry> entries = zipFile.entries();
- List<File> createdFiles = new ArrayList<>();
- while (entries.hasMoreElements()) {
- ZipEntry entry = entries.nextElement();
- String normalizedPath = FilenameUtils.normalize(FileUtil.joinPath(outputDirName, entry.getName()));
- Path candidatePath = Paths.get(normalizedPath);
- if (!candidatePath.startsWith(outputPath)) {
- throw new IOException("Malformed ZIP archive");
- }
- File entryDestination = new File(outputDir, entry.getName());
- if (!entry.isDirectory()) {
- entryDestination.getParentFile().mkdirs();
- try (InputStream in = zipFile.getInputStream(entry);
- OutputStream out = new FileOutputStream(entryDestination)) {
- createdFiles.add(entryDestination);
- IOUtils.copy(in, out);
- } catch (IOException e) {
- for (File f : createdFiles) {
- if (!f.delete()) {
- LOGGER.error("Couldn't clean up file after failed archive extraction: "
- + f.getAbsolutePath());
- }
- }
- throw e;
- }
- }
- }
- }
- }
-
- public static void shiv(String sourceFile, String outputDir) throws IOException {
- loadShim(outputDir, "pyro4.pyz");
- unzip(sourceFile, outputDir);
- unzip(outputDir + File.separator + "pyro4.pyz", outputDir);
- loadShim(outputDir, "entrypoint.py");
- }
-
- public static void loadShim(String outputDir, String name) throws IOException {
- try (InputStream is = DeploymentUtils.class.getClassLoader().getResourceAsStream(name)) {
- if (is != null) {
- IOUtils.copyLarge(is, new FileOutputStream(new File(outputDir + File.separator + name)));
- } else {
- throw new IOException("Classpath does not contain necessary Python resources!");
- }
- }
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 43b9003..2ba4768 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -1176,14 +1176,11 @@
private final List<URL> binaryURLs;
private final DeploymentId deploymentId;
- private final boolean extractFromArchive;
- public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId,
- boolean isExtractFromArchive) {
+ public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId) {
super(ccId);
this.binaryURLs = binaryURLs;
this.deploymentId = deploymentId;
- this.extractFromArchive = isExtractFromArchive;
}
@Override
@@ -1198,10 +1195,6 @@
public DeploymentId getDeploymentId() {
return deploymentId;
}
-
- public boolean isExtractFromArchive() {
- return extractFromArchive;
- }
}
public static class UnDeployBinaryFunction extends CCIdentifiedFunction {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index ea71bc9..d32ee32 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -92,9 +92,8 @@
}
@Override
- public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs, boolean extractFromArchive)
- throws Exception {
- DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs, ccId, extractFromArchive);
+ public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) throws Exception {
+ DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs, ccId);
ipcHandle.send(-1, rpaf, null);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index df08c04..8959053 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -102,8 +102,8 @@
case DEPLOY_BINARY:
CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn;
- ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs(),
- dbf.getCcId(), dbf.isExtractFromArchive()));
+ ncs.getWorkQueue()
+ .schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs(), dbf.getCcId()));
return;
case UNDEPLOY_BINARY:
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index bab729f..1356f4c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -216,8 +216,8 @@
jobletMap = new ConcurrentHashMap<>();
deployedJobSpecActivityClusterGraphMap = new Hashtable<>();
timer = new Timer(true);
- serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
- new File(ioManager.getWorkspacePath(0), id));
+ File ncBaseDir = ioManager.getWorkspacePath(0);
+ serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, ncBaseDir);
getNodeControllerInfosAcceptor = new MutableObject<>();
memoryManager =
new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
index 9054377..57bda8b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
@@ -75,7 +75,7 @@
ensureOpen();
}
- public void close() throws IOException {
+ public synchronized void close() throws IOException {
if (raf == null) {
return;
}
@@ -101,4 +101,8 @@
}
}
}
+
+ public synchronized boolean isOpen() {
+ return raf != null && raf.getChannel().isOpen();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 6c35eff..da78a0d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -25,6 +25,7 @@
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -365,6 +366,15 @@
}
@Override
+ public void truncate(IFileHandle fileHandle, long size) throws HyracksDataException {
+ try {
+ ((FileHandle) fileHandle).getFileChannel().truncate(size);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
public long getSize(IFileHandle fileHandle) {
return fileHandle.getFileReference().getFile().length();
}
@@ -435,4 +445,32 @@
}
return totalSize;
}
+
+ @Override
+ public WritableByteChannel newWritableChannel(IFileHandle fHandle) {
+ FileHandle fh = (FileHandle) fHandle;
+ if (!fh.isOpen()) {
+ throw new IllegalStateException("closed");
+ }
+ return new WritableByteChannel() {
+ long position;
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ int written = IOManager.this.syncWrite(fHandle, position, src);
+ position += written;
+ return written;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return fh.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOManager.this.close(fh);
+ }
+ };
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
index 586b539..e645d8e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
@@ -38,19 +38,16 @@
*/
public class DeployBinaryWork extends AbstractWork {
- private DeploymentId deploymentId;
- private NodeControllerService ncs;
- private List<URL> binaryURLs;
+ private final DeploymentId deploymentId;
+ private final NodeControllerService ncs;
+ private final List<URL> binaryURLs;
private final CcId ccId;
- private final boolean extractFromArchive;
- public DeployBinaryWork(NodeControllerService ncs, DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId,
- boolean extractFromArchive) {
+ public DeployBinaryWork(NodeControllerService ncs, DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId) {
this.deploymentId = deploymentId;
this.ncs = ncs;
this.binaryURLs = binaryURLs;
this.ccId = ccId;
- this.extractFromArchive = extractFromArchive;
}
@Override
@@ -58,7 +55,7 @@
DeploymentStatus status;
try {
DeploymentUtils.deploy(deploymentId, binaryURLs, ncs.getContext().getJobSerializerDeserializerContainer(),
- ncs.getServerContext(), true, extractFromArchive);
+ ncs.getServerContext(), true);
status = DeploymentStatus.SUCCEED;
} catch (Exception e) {
status = DeploymentStatus.FAIL;
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java
index 186fb0e..515b95c 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.http.api;
+import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import org.apache.hyracks.http.server.HttpServer;
@@ -55,4 +56,10 @@
default IChannelClosedHandler getChannelClosedHandler(HttpServer server) {
return server.getChannelClosedHandler();
}
+
+ /**
+ * Called at server startup to initialize the servlet
+ */
+ default void init() throws IOException {
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
index 3977207..514a7dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
@@ -53,7 +53,7 @@
protected final String[] paths;
protected final ConcurrentMap<String, Object> ctx;
- private final int[] trims;
+ protected final int[] trims;
public AbstractServlet(ConcurrentMap<String, Object> ctx, String... paths) {
this.paths = paths;
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 4cae09f..93762a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.http.server;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -224,7 +225,10 @@
return servlets.getServlets();
}
- protected void doStart() throws InterruptedException {
+ protected void doStart() throws InterruptedException, IOException {
+ for (IServlet servlet : servlets.getServlets()) {
+ servlet.init();
+ }
channel = bind();
}
@@ -372,4 +376,8 @@
public HttpServerConfig getConfig() {
return config;
}
+
+ public InetSocketAddress getAddress() {
+ return address;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
index 840fa24..82d7a3b 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
@@ -144,11 +144,9 @@
}
@Override
- public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId, boolean extractFromArchive)
- throws Exception {
+ public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception {
HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
- new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(binaryURLs, deploymentId,
- extractFromArchive);
+ new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(binaryURLs, deploymentId);
rpci.call(ipcHandle, dbf);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
index fa6f2f0..9351348 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
@@ -217,13 +217,12 @@
public DeploymentId deployBinary(List<String> files) throws Exception {
/** generate a deployment id */
DeploymentId deploymentId = new DeploymentId(UUID.randomUUID().toString());
- deployBinary(deploymentId, files, false);
+ deployBinary(deploymentId, files);
return deploymentId;
}
@Override
- public void deployBinary(DeploymentId deploymentId, List<String> files, boolean extractFromArchive)
- throws Exception {
+ public void deployBinary(DeploymentId deploymentId, List<String> files) throws Exception {
List<URL> binaryURLs = new ArrayList<>();
if (files != null && !files.isEmpty()) {
CloseableHttpClient hc = new DefaultHttpClient();
@@ -250,7 +249,7 @@
}
}
/** deploy the URLs to the CC and NCs */
- hci.deployBinary(binaryURLs, deploymentId, extractFromArchive);
+ hci.deployBinary(binaryURLs, deploymentId);
}
@Override