[NO ISSUE] Allow UDF Requests from NC
- Route UDF requests on NCs to the CC
- Enable library tests to round-robin betwen NC and CC
Change-Id: I16557c2efb4622c9639c2992c8b2ef0624bd650e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8186
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
new file mode 100644
index 0000000..6d7a847
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.app.message.CreateLibraryRequestMessage;
+import org.apache.asterix.app.message.DropLibraryRequestMessage;
+import org.apache.asterix.app.message.InternalRequestResponse;
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.functions.ExternalFunctionLanguage;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.commons.io.IOUtils;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.control.common.context.ServerContext;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.utils.HttpUtil;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpScheme;
+
+public class NCUdfApiServlet extends UdfApiServlet {
+
+ INcApplicationContext appCtx;
+ INCServiceContext srvCtx;
+
+ public NCUdfApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx,
+ ILangCompilationProvider compilationProvider, HttpScheme httpServerProtocol, int httpServerPort) {
+ super(ctx, paths, appCtx, compilationProvider, null, null, httpServerProtocol, httpServerPort);
+ }
+
+ @Override
+ public void init() throws IOException {
+ appCtx = (INcApplicationContext) plainAppCtx;
+ srvCtx = this.appCtx.getServiceContext();
+ workingDir = Paths.get(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath()).normalize()
+ .resolve(Paths.get(ServerContext.APP_DIR_NAME, ExternalLibraryManager.LIBRARY_MANAGER_BASE_DIR_NAME,
+ "tmp"));
+ initAuth();
+ initStorage();
+ }
+
+ @Override
+ protected void doCreate(DataverseName dataverseName, String libraryName, ExternalFunctionLanguage language,
+ URI downloadURI, boolean replaceIfExists, String sysAuthHeader, IRequestReference requestReference,
+ IServletRequest request, IServletResponse response) throws Exception {
+ INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker();
+ MessageFuture responseFuture = ncMb.registerMessageFuture();
+ CreateLibraryRequestMessage req = new CreateLibraryRequestMessage(srvCtx.getNodeId(),
+ responseFuture.getFutureId(), dataverseName, libraryName, language, downloadURI, replaceIfExists,
+ sysAuthHeader, requestReference, additionalHttpHeadersFromRequest(request));
+ sendMessage(req, responseFuture, requestReference, request, response);
+ }
+
+ @Override
+ protected void doDrop(DataverseName dataverseName, String libraryName, boolean replaceIfExists,
+ IRequestReference requestReference, IServletRequest request, IServletResponse response) throws Exception {
+ INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker();
+ MessageFuture responseFuture = ncMb.registerMessageFuture();
+ DropLibraryRequestMessage req =
+ new DropLibraryRequestMessage(srvCtx.getNodeId(), responseFuture.getFutureId(), dataverseName,
+ libraryName, replaceIfExists, requestReference, additionalHttpHeadersFromRequest(request));
+ sendMessage(req, responseFuture, requestReference, request, response);
+ }
+
+ private void sendMessage(ICcAddressedMessage requestMessage, MessageFuture responseFuture,
+ IRequestReference requestReference, IServletRequest request, IServletResponse response) throws Exception {
+ // Running on NC -> send 'execute' message to CC
+ INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker();
+ InternalRequestResponse responseMsg;
+ try {
+ ncMb.sendMessageToPrimaryCC(requestMessage);
+ responseMsg = (InternalRequestResponse) responseFuture.get(120000, TimeUnit.MILLISECONDS);
+
+ } finally {
+ ncMb.deregisterMessageFuture(responseFuture.getFutureId());
+ }
+
+ Throwable err = responseMsg.getError();
+ if (err != null) {
+ if (err instanceof Error) {
+ throw (Error) err;
+ } else if (err instanceof Exception) {
+ throw (Exception) err;
+ } else {
+ throw new Exception(err.toString(), err);
+ }
+ }
+ }
+
+ @Override
+ protected void get(IServletRequest request, IServletResponse response) throws Exception {
+ String localPath = localPath(request);
+ while (localPath.startsWith("/")) {
+ localPath = localPath.substring(1);
+ }
+ if (localPath.isEmpty()) {
+ response.setStatus(HttpResponseStatus.BAD_REQUEST);
+ return;
+ }
+ Path filePath = workingDir.resolve(localPath).normalize();
+ if (!filePath.startsWith(workingDir)) {
+ response.setStatus(HttpResponseStatus.BAD_REQUEST);
+ return;
+ }
+ readFromFile(filePath, response);
+ }
+
+ @Override
+ protected void readFromFile(Path filePath, IServletResponse response) throws Exception {
+ class InputStreamGetter extends SynchronizableWork {
+ private InputStream is;
+
+ @Override
+ protected void doRun() throws Exception {
+ is = Files.newInputStream(filePath);
+ }
+ }
+
+ InputStreamGetter r = new InputStreamGetter();
+ ((NodeControllerService) srvCtx.getControllerService()).getWorkQueue().scheduleAndSync(r);
+
+ if (r.is == null) {
+ response.setStatus(HttpResponseStatus.NOT_FOUND);
+ return;
+ }
+ try {
+ response.setStatus(HttpResponseStatus.OK);
+ HttpUtil.setContentType(response, "application/octet-stream");
+ IOUtils.copyLarge(r.is, response.outputStream());
+ } finally {
+ r.is.close();
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
index 18139f6..360f5a0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
@@ -38,6 +38,7 @@
import org.apache.asterix.app.result.ResponsePrinter;
import org.apache.asterix.app.translator.RequestParameters;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.IClusterManagementWork;
import org.apache.asterix.common.api.IReceptionist;
import org.apache.asterix.common.api.IRequestReference;
@@ -91,8 +92,9 @@
private static final Logger LOGGER = LogManager.getLogger();
- protected final ICcApplicationContext appCtx;
- private final ClusterControllerService ccs;
+ protected final IApplicationContext plainAppCtx;
+ private ICcApplicationContext appCtx;
+ private ClusterControllerService ccs;
private final HttpScheme httpServerProtocol;
private final int httpServerPort;
@@ -100,29 +102,30 @@
protected final IStatementExecutorFactory statementExecutorFactory;
protected final IStorageComponentProvider componentProvider;
protected final IReceptionist receptionist;
- protected final Path workingDir;
+ protected Path workingDir;
protected String sysAuthHeader;
- public UdfApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx,
+ public UdfApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx,
ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory,
IStorageComponentProvider componentProvider, HttpScheme httpServerProtocol, int httpServerPort) {
super(ctx, paths);
- this.appCtx = appCtx;
- ICCServiceContext srvCtx = appCtx.getServiceContext();
- this.ccs = (ClusterControllerService) srvCtx.getControllerService();
+ this.plainAppCtx = appCtx;
this.compilationProvider = compilationProvider;
this.statementExecutorFactory = statementExecutorFactory;
this.componentProvider = componentProvider;
this.receptionist = appCtx.getReceptionist();
this.httpServerProtocol = httpServerProtocol;
this.httpServerPort = httpServerPort;
- File baseDir = srvCtx.getServerCtx().getBaseDir();
- this.workingDir = baseDir.getAbsoluteFile().toPath().normalize().resolve(
- Paths.get(ServerContext.APP_DIR_NAME, ExternalLibraryManager.LIBRARY_MANAGER_BASE_DIR_NAME, "tmp"));
}
@Override
public void init() throws IOException {
+ appCtx = (ICcApplicationContext) plainAppCtx;
+ ICCServiceContext srvCtx = this.appCtx.getServiceContext();
+ this.ccs = (ClusterControllerService) srvCtx.getControllerService();
+ File baseDir = srvCtx.getServerCtx().getBaseDir();
+ this.workingDir = baseDir.getAbsoluteFile().toPath().normalize().resolve(
+ Paths.get(ServerContext.APP_DIR_NAME, ExternalLibraryManager.LIBRARY_MANAGER_BASE_DIR_NAME, "tmp"));
initAuth();
initStorage();
}
@@ -151,11 +154,6 @@
@Override
protected void post(IServletRequest request, IServletResponse response) {
- IClusterManagementWork.ClusterState clusterState = appCtx.getClusterStateManager().getState();
- if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
- response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
- return;
- }
HttpRequest httpRequest = request.getHttpRequest();
Pair<DataverseName, String> libraryName = parseLibraryName(request);
if (libraryName == null) {
@@ -190,9 +188,8 @@
}
fileUpload.renameTo(libraryTempFile.toFile());
URI downloadURI = createDownloadURI(libraryTempFile);
- CreateLibraryStatement stmt = new CreateLibraryStatement(libraryName.first, libraryName.second,
- language, downloadURI, true, sysAuthHeader);
- executeStatement(stmt, requestReference, request);
+ doCreate(libraryName.first, libraryName.second, language, downloadURI, true, sysAuthHeader,
+ requestReference, request, response);
response.setStatus(HttpResponseStatus.OK);
} catch (Exception e) {
response.setStatus(toHttpErrorStatus(e));
@@ -213,6 +210,14 @@
}
}
+ protected void doCreate(DataverseName dataverseName, String libraryName, ExternalFunctionLanguage language,
+ URI downloadURI, boolean replaceIfExists, String sysAuthHeader, IRequestReference requestReference,
+ IServletRequest request, IServletResponse response) throws Exception {
+ CreateLibraryStatement stmt = new CreateLibraryStatement(dataverseName, libraryName, language, downloadURI,
+ replaceIfExists, sysAuthHeader);
+ executeStatement(stmt, requestReference, request, response);
+ }
+
protected URI createDownloadURI(Path file) throws Exception {
String path = paths[0].substring(0, trims[0]) + '/' + file.getFileName();
String host = getHyracksClientConnection().getHost();
@@ -220,11 +225,6 @@
}
protected void delete(IServletRequest request, IServletResponse response) {
- IClusterManagementWork.ClusterState clusterState = appCtx.getClusterStateManager().getState();
- if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
- response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
- return;
- }
Pair<DataverseName, String> libraryName = parseLibraryName(request);
if (libraryName == null) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
@@ -232,8 +232,7 @@
}
try {
IRequestReference requestReference = receptionist.welcome(request);
- LibraryDropStatement stmt = new LibraryDropStatement(libraryName.first, libraryName.second, false);
- executeStatement(stmt, requestReference, request);
+ doDrop(libraryName.first, libraryName.second, false, requestReference, request, response);
response.setStatus(HttpResponseStatus.OK);
} catch (Exception e) {
response.setStatus(toHttpErrorStatus(e));
@@ -244,8 +243,19 @@
}
}
- protected void executeStatement(Statement statement, IRequestReference requestReference, IServletRequest request)
- throws Exception {
+ protected void doDrop(DataverseName dataverseName, String libraryName, boolean replaceIfExists,
+ IRequestReference requestReference, IServletRequest request, IServletResponse response) throws Exception {
+ LibraryDropStatement stmt = new LibraryDropStatement(dataverseName, libraryName, replaceIfExists);
+ executeStatement(stmt, requestReference, request, response);
+ }
+
+ protected void executeStatement(Statement statement, IRequestReference requestReference, IServletRequest request,
+ IServletResponse response) throws Exception {
+ IClusterManagementWork.ClusterState clusterState = appCtx.getClusterStateManager().getState();
+ if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
+ response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
+ return;
+ }
SessionOutput sessionOutput = new SessionOutput(new SessionConfig(SessionConfig.OutputFormat.ADM),
new PrintWriter(NullWriter.NULL_WRITER));
ResponsePrinter printer = new ResponsePrinter(sessionOutput);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractInternalRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractInternalRequestMessage.java
new file mode 100644
index 0000000..26aac63
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractInternalRequestMessage.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.message;
+
+import java.io.PrintWriter;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.app.cc.CCExtensionManager;
+import org.apache.asterix.app.result.ResponsePrinter;
+import org.apache.asterix.app.translator.RequestParameters;
+import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.translator.IRequestParameters;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.commons.io.output.NullWriter;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public abstract class AbstractInternalRequestMessage implements ICcAddressedMessage {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 1L;
+ final String nodeRequestId;
+ final long requestMessageId;
+ final IRequestReference requestReference;
+ final Map<String, String> additionalParams;
+
+ public AbstractInternalRequestMessage(String nodeRequestId, long requestMessageId,
+ IRequestReference requestReference, Map<String, String> additionalParams) {
+ this.nodeRequestId = nodeRequestId;
+ this.requestMessageId = requestMessageId;
+ this.requestReference = requestReference;
+ this.additionalParams = additionalParams;
+ }
+
+ @Override
+ public void handle(ICcApplicationContext ccAppCtx) throws HyracksDataException {
+ ICCServiceContext ccSrvContext = ccAppCtx.getServiceContext();
+ ClusterControllerService ccSrv = (ClusterControllerService) ccSrvContext.getControllerService();
+ CCApplication ccApp = (CCApplication) ccSrv.getApplication();
+ CCMessageBroker messageBroker = (CCMessageBroker) ccSrvContext.getMessageBroker();
+ final RuntimeDataException rejectionReason =
+ ExecuteStatementRequestMessage.getRejectionReason(ccSrv, nodeRequestId);
+ if (rejectionReason != null) {
+ ExecuteStatementRequestMessage.sendRejection(rejectionReason, messageBroker, requestMessageId,
+ nodeRequestId);
+ return;
+ }
+ CCExtensionManager ccExtMgr = (CCExtensionManager) ccAppCtx.getExtensionManager();
+ ILangCompilationProvider compilationProvider = ccExtMgr.getCompilationProvider(ILangExtension.Language.SQLPP);
+ IStorageComponentProvider componentProvider = ccAppCtx.getStorageComponentProvider();
+ IStatementExecutorFactory statementExecutorFactory = ccApp.getStatementExecutorFactory();
+ InternalRequestResponse responseMsg = new InternalRequestResponse(requestMessageId);
+ SessionOutput sessionOutput = new SessionOutput(new SessionConfig(SessionConfig.OutputFormat.ADM),
+ new PrintWriter(NullWriter.NULL_WRITER));
+ ResponsePrinter printer = new ResponsePrinter(sessionOutput);
+ ResultProperties resultProperties = new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE, 1);
+ IRequestParameters requestParams = new RequestParameters(requestReference, "", null, resultProperties,
+ new IStatementExecutor.Stats(), new IStatementExecutor.StatementProperties(), null, null,
+ additionalParams, Collections.emptyMap(), false);
+ MetadataManager.INSTANCE.init();
+ IStatementExecutor translator =
+ statementExecutorFactory.create(ccAppCtx, Collections.singletonList(produceStatement()), sessionOutput,
+ compilationProvider, componentProvider, printer);
+ try {
+ translator.compileAndExecute(ccAppCtx.getHcc(), requestParams);
+ } catch (AlgebricksException | HyracksException | org.apache.asterix.lang.sqlpp.parser.TokenMgrError pe) {
+ // we trust that "our" exceptions are serializable and have a comprehensible error message
+ GlobalConfig.ASTERIX_LOGGER.log(Level.WARN, pe.getMessage(), pe);
+ responseMsg.setError(pe);
+ } catch (Exception e) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, "Unexpected exception", e);
+ responseMsg.setError(e);
+ }
+ try {
+ messageBroker.sendApplicationMessageToNC(responseMsg, nodeRequestId);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, e.toString(), e);
+ }
+
+ }
+
+ protected abstract Statement produceStatement();
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CreateLibraryRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CreateLibraryRequestMessage.java
new file mode 100644
index 0000000..818a098
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CreateLibraryRequestMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.functions.ExternalFunctionLanguage;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.CreateLibraryStatement;
+
+public final class CreateLibraryRequestMessage extends AbstractInternalRequestMessage {
+
+ final DataverseName dataverseName;
+ final String libraryName;
+ final ExternalFunctionLanguage lang;
+ final URI location;
+ final boolean replaceIfExists;
+ final String authToken;
+ private static final long serialVersionUID = 1L;
+
+ public CreateLibraryRequestMessage(String nodeRequestId, long requestMessageId, DataverseName dataverseName,
+ String libraryName, ExternalFunctionLanguage lang, URI location, boolean replaceIfExists, String authToken,
+ IRequestReference requestReference, Map<String, String> additionalParams) {
+ super(nodeRequestId, requestMessageId, requestReference, additionalParams);
+ this.dataverseName = dataverseName;
+ this.libraryName = libraryName;
+ this.lang = lang;
+ this.location = location;
+ this.replaceIfExists = replaceIfExists;
+ this.authToken = authToken;
+ }
+
+ protected Statement produceStatement() {
+ return new CreateLibraryStatement(dataverseName, libraryName, lang, location, replaceIfExists, authToken);
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DropLibraryRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DropLibraryRequestMessage.java
new file mode 100644
index 0000000..e7e8931
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DropLibraryRequestMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.Map;
+
+import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.LibraryDropStatement;
+
+public final class DropLibraryRequestMessage extends AbstractInternalRequestMessage {
+
+ final DataverseName dataverseName;
+ final String libraryName;
+ final boolean ifExists;
+ private static final long serialVersionUID = 1L;
+
+ public DropLibraryRequestMessage(String nodeRequestId, long requestMessageId, DataverseName dataverseName,
+ String libraryName, boolean ifExists, IRequestReference requestReference,
+ Map<String, String> additionalParams) {
+ super(nodeRequestId, requestMessageId, requestReference, additionalParams);
+ this.dataverseName = dataverseName;
+ this.libraryName = libraryName;
+ this.ifExists = ifExists;
+ }
+
+ @Override
+ protected Statement produceStatement() {
+ return new LibraryDropStatement(dataverseName, libraryName, ifExists);
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index a6ecc33..2552040 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -114,9 +114,9 @@
ClusterControllerService ccSrv = (ClusterControllerService) ccSrvContext.getControllerService();
CCApplication ccApp = (CCApplication) ccSrv.getApplication();
CCMessageBroker messageBroker = (CCMessageBroker) ccSrvContext.getMessageBroker();
- final RuntimeDataException rejectionReason = getRejectionReason(ccSrv);
+ final RuntimeDataException rejectionReason = getRejectionReason(ccSrv, requestNodeId);
if (rejectionReason != null) {
- sendRejection(rejectionReason, messageBroker);
+ sendRejection(rejectionReason, messageBroker, requestMessageId, requestNodeId);
return;
}
CCExtensionManager ccExtMgr = (CCExtensionManager) ccAppCtx.getExtensionManager();
@@ -176,7 +176,7 @@
}
}
- private RuntimeDataException getRejectionReason(ClusterControllerService ccSrv) {
+ static RuntimeDataException getRejectionReason(ClusterControllerService ccSrv, String requestNodeId) {
if (ccSrv.getNodeManager().getNodeControllerState(requestNodeId) == null) {
return new RuntimeDataException(ErrorCode.REJECT_NODE_UNREGISTERED);
}
@@ -189,7 +189,8 @@
return null;
}
- private void sendRejection(RuntimeDataException reason, CCMessageBroker messageBroker) {
+ static void sendRejection(RuntimeDataException reason, CCMessageBroker messageBroker, long requestMessageId,
+ String requestNodeId) {
ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
responseMsg.setError(reason);
try {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/InternalRequestResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/InternalRequestResponse.java
new file mode 100644
index 0000000..3083efc
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/InternalRequestResponse.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class InternalRequestResponse implements INcAddressedMessage {
+
+ private final long requestMessageId;
+ private Throwable error;
+ private static final long serialVersionUID = 1L;
+
+ public InternalRequestResponse(long requestMessageId) {
+ this.requestMessageId = requestMessageId;
+ }
+
+ public void setError(Throwable error) {
+ this.error = error;
+ }
+
+ public Throwable getError() {
+ return error;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ MessageFuture future = mb.deregisterMessageFuture(requestMessageId);
+ if (future != null) {
+ future.complete(this);
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 3d5eb47..771c007 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -277,7 +277,8 @@
ccServiceCtx.getControllerService().getExecutor());
jsonAPIServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR, ccServiceCtx);
jsonAPIServer.setAttribute(ServletConstants.CREDENTIAL_MAP,
- parseCredentialMap(externalProperties.getCredentialFilePath()));
+ parseCredentialMap(((ClusterControllerService) (appCtx.getServiceContext().getControllerService()))
+ .getCCConfig().getCredentialFilePath()));
// Other APIs.
addServlet(jsonAPIServer, Servlets.QUERY_STATUS);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index e90976e..fd5ecb5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -22,15 +22,22 @@
import static org.apache.asterix.common.utils.Servlets.QUERY_RESULT;
import static org.apache.asterix.common.utils.Servlets.QUERY_SERVICE;
import static org.apache.asterix.common.utils.Servlets.QUERY_STATUS;
+import static org.apache.asterix.common.utils.Servlets.UDF;
+import java.io.File;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.api.http.server.BasicAuthServlet;
import org.apache.asterix.api.http.server.NCQueryServiceServlet;
+import org.apache.asterix.api.http.server.NCUdfApiServlet;
import org.apache.asterix.api.http.server.NetDiagnosticsApiServlet;
import org.apache.asterix.api.http.server.QueryResultApiServlet;
import org.apache.asterix.api.http.server.QueryStatusApiServlet;
@@ -73,6 +80,9 @@
import org.apache.asterix.translator.Receptionist;
import org.apache.asterix.util.MetadataBuiltinFunctions;
import org.apache.asterix.utils.RedactionUtil;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.config.IConfigManager;
@@ -204,6 +214,12 @@
ncExtensionManager.getCompilationProvider(ILangExtension.Language.SQLPP);
apiServer.addServlet(new NCQueryServiceServlet(apiServer.ctx(), new String[] { QUERY_SERVICE },
getApplicationContext(), sqlppCompilationProvider.getLanguage(), sqlppCompilationProvider, null));
+ apiServer.setAttribute(ServletConstants.CREDENTIAL_MAP,
+ parseCredentialMap(((NodeControllerService) ncServiceCtx.getControllerService()).getConfiguration()
+ .getCredentialFilePath()));
+ apiServer.addServlet(new BasicAuthServlet(apiServer.ctx(),
+ new NCUdfApiServlet(apiServer.ctx(), new String[] { UDF }, getApplicationContext(),
+ sqlppCompilationProvider, apiServer.getScheme(), apiServer.getAddress().getPort())));
apiServer.addServlet(new QueryStatusApiServlet(apiServer.ctx(), getApplicationContext(), QUERY_STATUS));
apiServer.addServlet(new QueryResultApiServlet(apiServer.ctx(), getApplicationContext(), QUERY_RESULT));
webManager.add(apiServer);
@@ -340,4 +356,24 @@
protected void configurePersistedResourceRegistry() {
ncServiceCtx.setPersistedResourceRegistry(new PersistedResourceRegistry());
}
+
+ private Map<String, String> parseCredentialMap(String credPath) {
+ File credentialFile = new File(credPath);
+ Map<String, String> storedCredentials = new HashMap<>();
+ if (credentialFile.exists()) {
+ try (CSVParser p =
+ CSVParser.parse(credentialFile, Charset.defaultCharset(), CSVFormat.DEFAULT.withDelimiter(':'))) {
+ List<CSVRecord> recs = p.getRecords();
+ for (CSVRecord r : recs) {
+ if (r.size() != 2) {
+ throw new IOException("Passwd file must have exactly two fields.");
+ }
+ storedCredentials.put(r.get(0), r.get(1));
+ }
+ } catch (IOException e) {
+ LOGGER.error("Malformed credential file", e);
+ }
+ }
+ return storedCredentials;
+ }
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
index 2dc7326..c2b576a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
@@ -20,7 +20,7 @@
import java.io.File;
import java.io.IOException;
-import java.net.URL;
+import java.net.URI;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.commons.io.IOUtils;
@@ -48,25 +48,15 @@
public class ExternalUDFLibrarian implements IExternalUDFLibrarian {
private HttpClient hc;
- private String host;
- private int port;
-
- private ExternalUDFLibrarian(String host, int port) {
- hc = new DefaultHttpClient();
- this.host = host;
- this.port = port;
- }
public ExternalUDFLibrarian() {
- this("localhost", 19002);
+ hc = new DefaultHttpClient();
}
@Override
- public void install(String dataverse, String libName, String libPath, Pair<String, String> credentials)
- throws Exception {
- URL url = new URL("http", host, port, "/admin/udf/" + dataverse + "/" + libName);
- HttpHost h = new HttpHost(host, port, "http");
- HttpPost post = new HttpPost(url.toString());
+ public void install(URI path, String libPath, Pair<String, String> credentials) throws Exception {
+ HttpHost h = new HttpHost(path.getHost(), path.getPort(), "http");
+ HttpPost post = new HttpPost(path);
CredentialsProvider cp = new BasicCredentialsProvider();
cp.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(credentials.first, credentials.second));
HttpClientContext hcCtx = HttpClientContext.create();
@@ -86,18 +76,16 @@
}
@Override
- public void uninstall(String dataverse, String libName, Pair<String, String> credentials)
- throws IOException, AsterixException {
- URL url = new URL("http", host, port, "/admin/udf/" + dataverse + "/" + libName);
+ public void uninstall(URI path, Pair<String, String> credentials) throws IOException, AsterixException {
CredentialsProvider cp = new BasicCredentialsProvider();
cp.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(credentials.first, credentials.second));
HttpClientContext hcCtx = HttpClientContext.create();
hcCtx.setCredentialsProvider(cp);
- HttpHost h = new HttpHost(host, port, "http");
+ HttpHost h = new HttpHost(path.getHost(), path.getPort(), "http");
AuthCache ac = new BasicAuthCache();
ac.put(h, new BasicScheme());
hcCtx.setAuthCache(ac);
- HttpDelete del = new HttpDelete(url.toString());
+ HttpDelete del = new HttpDelete(path);
HttpResponse response = hc.execute(del, hcCtx);
String resp = null;
int respCode = response.getStatusLine().getStatusCode();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
index 1933f24..2315bec 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
@@ -19,13 +19,14 @@
package org.apache.asterix.app.external;
import java.io.IOException;
+import java.net.URI;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.hyracks.algebricks.common.utils.Pair;
public interface IExternalUDFLibrarian {
- void install(String dataverse, String libName, String libPath, Pair<String, String> credentials) throws Exception;
- void uninstall(String dataverse, String libName, Pair<String, String> credentials)
- throws IOException, AsterixException;
+ void install(URI path, String libPath, Pair<String, String> credentials) throws Exception;
+
+ void uninstall(URI path, Pair<String, String> credentials) throws IOException, AsterixException;
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index f535f2c..e67b900 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -1209,13 +1209,15 @@
throw new Exception("invalid library format");
}
String libPath = command[5];
- librarian.install(dataverse, library, libPath, new Pair<>(username, pw));
+ URI create = createEndpointURI("/admin/udf/" + dataverse + "/" + library);
+ librarian.install(create, libPath, new Pair<>(username, pw));
break;
case "uninstall":
if (command.length != 5) {
throw new Exception("invalid library format");
}
- librarian.uninstall(dataverse, library, new Pair<>(username, pw));
+ URI delete = createEndpointURI("/admin/udf/" + dataverse + "/" + library);
+ librarian.uninstall(delete, new Pair<>(username, pw));
break;
default:
throw new Exception("invalid library format");
@@ -2229,7 +2231,8 @@
protected URI createEndpointURI(String pathAndQuery) throws URISyntaxException {
InetSocketAddress endpoint;
- if (!ncEndPointsList.isEmpty() && pathAndQuery.equals(Servlets.QUERY_SERVICE)) {
+ if (!ncEndPointsList.isEmpty() && (pathAndQuery.equals(Servlets.QUERY_SERVICE)
+ || pathAndQuery.startsWith(Servlets.getAbsolutePath(Servlets.UDF)))) {
int endpointIdx = Math.abs(endpointSelector++ % ncEndPointsList.size());
endpoint = ncEndPointsList.get(endpointIdx);
} else if (isCcEndPointPath(pathAndQuery)) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionIT.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionIT.java
index 39f0948..91f7b05 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionIT.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionIT.java
@@ -19,10 +19,16 @@
package org.apache.asterix.test.runtime;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.control.nc.NodeControllerService;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -39,7 +45,9 @@
@BeforeClass
public static void setUp() throws Exception {
- LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
+ final TestExecutor testExecutor = new TestExecutor();
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+ setNcEndpoints(testExecutor);
}
@AfterClass
@@ -62,4 +70,17 @@
public void test() throws Exception {
LangExecutionUtil.test(tcCtx);
}
+
+ private static void setNcEndpoints(TestExecutor testExecutor) {
+ final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs;
+ final Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+ final String ip = InetAddress.getLoopbackAddress().getHostAddress();
+ for (NodeControllerService nc : ncs) {
+ final String nodeId = nc.getId();
+ final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext();
+ int apiPort = appCtx.getExternalProperties().getNcApiPort();
+ ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort));
+ }
+ testExecutor.setNcEndPoints(ncEndPoints);
+ }
}
diff --git a/asterixdb/asterix-app/src/test/resources/cc.conf b/asterixdb/asterix-app/src/test/resources/cc.conf
index 119b53a..51ee756 100644
--- a/asterixdb/asterix-app/src/test/resources/cc.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc.conf
@@ -32,6 +32,7 @@
#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
[nc]
+credential.file=src/test/resources/security/passwd
address=127.0.0.1
command=asterixnc
app.class=org.apache.asterix.hyracks.bootstrap.NCApplication
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
index 033b751..8f83b35 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
@@ -23,15 +23,10 @@
import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
import static org.apache.hyracks.control.common.config.OptionTypes.UNSIGNED_INTEGER;
-import java.util.function.Function;
-
-import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.IOptionType;
import org.apache.hyracks.api.config.Section;
-import org.apache.hyracks.control.common.controllers.ControllerConfig;
import org.apache.hyracks.util.StorageUtil;
-import org.apache.hyracks.util.file.FileUtil;
import org.apache.logging.log4j.Level;
public class ExternalProperties extends AbstractProperties {
@@ -54,12 +49,7 @@
UNSIGNED_INTEGER,
StorageUtil.getIntSizeInBytes(200, StorageUtil.StorageUnit.MEGABYTE),
"The maximum accepted web request size in bytes"),
- REQUESTS_ARCHIVE_SIZE(UNSIGNED_INTEGER, 50, "The maximum number of archived requests to maintain"),
- CREDENTIAL_FILE(
- STRING,
- (Function<IApplicationConfig, String>) appConfig -> FileUtil
- .joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "passwd"),
- ControllerConfig.Option.DEFAULT_DIR.cmdline() + "/passwd");
+ REQUESTS_ARCHIVE_SIZE(UNSIGNED_INTEGER, 50, "The maximum number of archived requests to maintain");
private final IOptionType type;
private final Object defaultValue;
@@ -79,7 +69,6 @@
case API_PORT:
case ACTIVE_PORT:
case REQUESTS_ARCHIVE_SIZE:
- case CREDENTIAL_FILE:
return Section.CC;
case NC_API_PORT:
return Section.NC;
@@ -159,7 +148,4 @@
return accessor.getInt(Option.REQUESTS_ARCHIVE_SIZE);
}
- public String getCredentialFilePath() {
- return accessor.getString(Option.CREDENTIAL_FILE);
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index bd84f11..b0ee497 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -37,6 +37,7 @@
import org.apache.hyracks.api.config.Section;
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.control.common.config.ConfigManager;
+import org.apache.hyracks.control.common.config.OptionTypes;
import org.apache.hyracks.util.file.FileUtil;
import org.ini4j.Ini;
@@ -78,7 +79,12 @@
CONTROLLER_ID(SHORT, (short) 0x0000),
KEY_STORE_PATH(STRING),
TRUST_STORE_PATH(STRING),
- KEY_STORE_PASSWORD(STRING);
+ KEY_STORE_PASSWORD(STRING),
+ CREDENTIAL_FILE(
+ OptionTypes.STRING,
+ (Function<IApplicationConfig, String>) appConfig -> FileUtil
+ .joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "passwd"),
+ ControllerConfig.Option.DEFAULT_DIR.cmdline() + "/passwd");
private final IOptionType parser;
private Object defaultValue;
@@ -198,6 +204,8 @@
return "A fully-qualified path to a trust store file that will be used for secured connections";
case KEY_STORE_PASSWORD:
return "The password to the provided key store";
+ case CREDENTIAL_FILE:
+ return "Path to HTTP basic credentials";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -465,7 +473,12 @@
}
public void setTrustStorePath(String trustStorePath) {
+
configManager.set(Option.TRUST_STORE_PATH, trustStorePath);
}
+ public String getCredentialFilePath() {
+ return getAppConfig().getString(Option.CREDENTIAL_FILE);
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java
index f7291cb..9a505d5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java
@@ -132,4 +132,5 @@
public boolean isSslEnabled() {
return getAppConfig().getBoolean(Option.SSL_ENABLED);
}
-}
+
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index eaf0418..71c33d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -37,6 +37,7 @@
import org.apache.hyracks.api.config.IOptionType;
import org.apache.hyracks.api.config.Section;
import org.apache.hyracks.control.common.config.ConfigManager;
+import org.apache.hyracks.control.common.config.OptionTypes;
import org.apache.hyracks.util.file.FileUtil;
public class NCConfig extends ControllerConfig {
@@ -95,7 +96,12 @@
PYTHON_CMD(STRING, (String) null),
PYTHON_ADDITIONAL_PACKAGES(STRING_ARRAY, (String[]) null),
PYTHON_USE_BUNDLED_MSGPACK(BOOLEAN, true),
- PYTHON_ARGS(STRING_ARRAY, (String[]) null);
+ PYTHON_ARGS(STRING_ARRAY, (String[]) null),
+ CREDENTIAL_FILE(
+ OptionTypes.STRING,
+ (Function<IApplicationConfig, String>) appConfig -> FileUtil
+ .joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "passwd"),
+ ControllerConfig.Option.DEFAULT_DIR.cmdline() + "/passwd");
private final IOptionType parser;
private final String defaultValueDescription;
@@ -236,6 +242,8 @@
return "True to include bundled msgpack on Python sys.path, false to use system-provided msgpack";
case PYTHON_ARGS:
return "Python args to pass to Python interpreter";
+ case CREDENTIAL_FILE:
+ return "Path to HTTP basic credentials";
default:
throw new IllegalStateException("Not yet implemented: " + this);
}
@@ -606,4 +614,8 @@
public int getIOQueueSize() {
return appConfig.getInt(Option.IO_QUEUE_SIZE);
}
+
+ public String getCredentialFilePath() {
+ return getAppConfig().getString(Option.CREDENTIAL_FILE);
+ }
}