[ASTERIXDB-2435][*DB][CLUS] Block unknown nodes
Provide mechanism by which unauthorized nodes can be prevented from
joining hyracks cluster. In *DB, only authorize configured nodes to
join.
Change-Id: I3a45a41f69e8e9968dd65bb2268d3d3a2ced9664
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2851
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 5c05b2f..a5fd063 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -86,6 +86,7 @@
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.control.IGatekeeper;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
@@ -117,6 +118,7 @@
@Override
public void init(IServiceContext serviceCtx) throws Exception {
+ super.init(serviceCtx);
ccServiceCtx = (ICCServiceContext) serviceCtx;
ccServiceCtx.setThreadFactory(
new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager()));
@@ -354,4 +356,8 @@
ApplicationConfigurator.validateJavaRuntime();
}
+ @Override
+ public IGatekeeper getGatekeeper() {
+ return getConfigManager().getAppConfig().getNCNames()::contains;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
index 6bcdd8a..aa1021b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.api.application;
import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.control.IGatekeeper;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
public interface ICCApplication extends IApplication {
@@ -26,4 +27,7 @@
IJobCapacityController getJobCapacityController();
IConfigManager getConfigManager();
+
+ IGatekeeper getGatekeeper();
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/IGatekeeper.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/IGatekeeper.java
new file mode 100644
index 0000000..43c8143
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/IGatekeeper.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.control;
+
+@FunctionalInterface
+public interface IGatekeeper {
+ /**
+ * Indicates whether the supplied node is authorized to join this cluster
+ * @param nodeId
+ * the node to consider
+ * @return <code>true</code> if the supplied node is authorized
+ */
+ boolean isAuthorized(String nodeId);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
index dc63481..46adda3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.control.IGatekeeper;
import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.util.HyracksConstants;
@@ -91,4 +92,8 @@
return configManager;
}
+ @Override
+ public IGatekeeper getGatekeeper() {
+ return node -> true;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 8ed015c..ae82803 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -179,7 +179,7 @@
threadDumpRunMap = Collections.synchronizedMap(new HashMap<>());
// Node manager is in charge of cluster membership management.
- nodeManager = new NodeManager(this, ccConfig, resourceManager);
+ nodeManager = new NodeManager(this, ccConfig, resourceManager, application.getGatekeeper());
ccId = ccConfig.getCcId();
jobIdFactory = new JobIdFactory(ccId);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 31f989b..4f76ced 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -34,6 +34,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.control.IGatekeeper;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -45,8 +46,6 @@
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.scheduler.IResourceManager;
import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.ipc.CCNCFunctions.AbortCCJobsFunction;
-import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.exceptions.IPCException;
import org.apache.hyracks.util.annotations.Idempotent;
import org.apache.hyracks.util.annotations.NotThreadSafe;
@@ -63,14 +62,17 @@
private final Map<String, NodeControllerState> nodeRegistry;
private final Map<InetAddress, Set<String>> ipAddressNodeNameMap;
private final int nodeCoresMultiplier;
+ private final IGatekeeper gatekeeper;
- public NodeManager(ClusterControllerService ccs, CCConfig ccConfig, IResourceManager resourceManager) {
+ public NodeManager(ClusterControllerService ccs, CCConfig ccConfig, IResourceManager resourceManager,
+ IGatekeeper gatekeeper) {
this.ccs = ccs;
this.ccConfig = ccConfig;
this.resourceManager = resourceManager;
this.nodeRegistry = new LinkedHashMap<>();
this.ipAddressNodeNameMap = new HashMap<>();
this.nodeCoresMultiplier = ccConfig.getCoresMultiplier();
+ this.gatekeeper = gatekeeper;
}
@Override
@@ -95,13 +97,16 @@
@Override
public synchronized void addNode(String nodeId, NodeControllerState ncState) throws HyracksException {
- LOGGER.warn("addNode(" + nodeId + ") called");
+ LOGGER.warn("+addNode: " + nodeId);
if (nodeId == null || ncState == null) {
throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
}
+ if (!gatekeeper.isAuthorized(nodeId)) {
+ throw HyracksException.create(ErrorCode.NO_SUCH_NODE, nodeId);
+ }
// Updates the node registry.
if (nodeRegistry.containsKey(nodeId)) {
- LOGGER.warn("Node with name " + nodeId + " has already registered; failing the node then re-registering.");
+ LOGGER.warn("Node '" + nodeId + "' is already registered; failing the node then re-registering.");
failNode(nodeId);
}
try {
@@ -109,7 +114,7 @@
} catch (IPCException e) {
throw HyracksDataException.create(e);
}
- LOGGER.warn("adding node to registry");
+ LOGGER.info("adding node to registry");
nodeRegistry.put(nodeId, ncState);
// Updates the IP address to node names map.
try {
@@ -121,8 +126,7 @@
nodeRegistry.remove(nodeId);
throw e;
}
- // Updates the cluster capacity.
- LOGGER.warn("updating cluster capacity");
+ LOGGER.info("updating cluster capacity");
resourceManager.update(nodeId, getAdjustedNodeCapacity(ncState.getCapacity()));
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index 43b00f3..9d755a0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -56,7 +56,7 @@
final CCConfig ccConfig = makeCCConfig();
final int coresMultiplier = 1;
ccConfig.setCoresMultiplier(coresMultiplier);
- INodeManager nodeManager = new NodeManager(mockCcs(), ccConfig, resourceManager);
+ INodeManager nodeManager = new NodeManager(mockCcs(), ccConfig, resourceManager, nodeId -> true);
NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
@@ -84,7 +84,7 @@
final CCConfig ccConfig = makeCCConfig();
final int coresMultiplier = 3;
ccConfig.setCoresMultiplier(coresMultiplier);
- INodeManager nodeManager = new NodeManager(mockCcs(), ccConfig, resourceManager);
+ INodeManager nodeManager = new NodeManager(mockCcs(), ccConfig, resourceManager, nodeId -> true);
NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
@@ -113,7 +113,7 @@
@Test
public void testException() throws HyracksException, IPCException {
IResourceManager resourceManager = new ResourceManager();
- INodeManager nodeManager = new NodeManager(mockCcs(), makeCCConfig(), resourceManager);
+ INodeManager nodeManager = new NodeManager(mockCcs(), makeCCConfig(), resourceManager, nodeId -> true);
NodeControllerState ncState1 = mockNodeControllerState(NODE1, true);
boolean invalidNetworkAddress = false;
@@ -142,7 +142,7 @@
@Test
public void testNullNode() throws HyracksException {
IResourceManager resourceManager = new ResourceManager();
- INodeManager nodeManager = new NodeManager(null, makeCCConfig(), resourceManager);
+ INodeManager nodeManager = new NodeManager(null, makeCCConfig(), resourceManager, nodeId -> true);
boolean invalidParameter = false;
// Verifies states after a failure during adding nodes.
@@ -217,4 +217,26 @@
Assert.assertTrue(nodeNotExist);
}
+ @Test
+ public void testUnauthorized() throws HyracksException, IPCException {
+ IResourceManager resourceManager = new ResourceManager();
+ final CCConfig ccConfig = makeCCConfig();
+ INodeManager nodeManager = new NodeManager(mockCcs(), ccConfig, resourceManager, NODE1::equals);
+ NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
+ NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
+
+ nodeManager.addNode(NODE1, ncState1);
+ boolean nodeNotExist = false;
+ try {
+ nodeManager.addNode(NODE2, ncState2);
+ } catch (HyracksException e) {
+ nodeNotExist = e.getErrorCode() == ErrorCode.NO_SUCH_NODE;
+ }
+ Assert.assertTrue(nodeNotExist);
+ Assert.assertTrue(nodeManager.getIpAddressNodeNameMap().size() == 1);
+ Assert.assertTrue(nodeManager.getAllNodeIds().size() == 1);
+ Assert.assertTrue(nodeManager.getAllNodeControllerStates().size() == 1);
+ Assert.assertTrue(nodeManager.getNodeControllerState(NODE1) == ncState1);
+ }
+
}