[ASTERIXDB-2268][CONF] Add Cores Multiplier
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add configurable cores multiplier to CC
config and default it to 3. The multiplier
will be used to adjust nodes cores capacity.
- Add test case for adjusted node capacity.
Change-Id: I95dd6e0c1add92e70e667321e8ef5f9b9887cda5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2326
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 712d2ec..367a1d5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -59,6 +59,7 @@
private final IResourceManager resourceManager;
private final Map<String, NodeControllerState> nodeRegistry;
private final Map<InetAddress, Set<String>> ipAddressNodeNameMap;
+ private final int nodeCoresMultiplier;
public NodeManager(ClusterControllerService ccs, CCConfig ccConfig, IResourceManager resourceManager) {
this.ccs = ccs;
@@ -66,6 +67,7 @@
this.resourceManager = resourceManager;
this.nodeRegistry = new LinkedHashMap<>();
this.ipAddressNodeNameMap = new HashMap<>();
+ this.nodeCoresMultiplier = ccConfig.getCoresMultiplier();
}
@Override
@@ -122,7 +124,7 @@
}
// Updates the cluster capacity.
LOGGER.warn("updating cluster capacity");
- resourceManager.update(nodeId, ncState.getCapacity());
+ resourceManager.update(nodeId, getAdjustedNodeCapacity(ncState.getCapacity()));
}
@Override
@@ -218,4 +220,7 @@
}
}
+ private NodeCapacity getAdjustedNodeCapacity(NodeCapacity nodeCapacity) {
+ return new NodeCapacity(nodeCapacity.getMemoryByteSize(), nodeCapacity.getCores() * nodeCoresMultiplier);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index 9b9a3b4..931e436 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -51,7 +51,10 @@
@Test
public void testNormal() throws HyracksException, IPCException {
IResourceManager resourceManager = new ResourceManager();
- INodeManager nodeManager = new NodeManager(mockCcs(), makeCCConfig(), resourceManager);
+ final CCConfig ccConfig = makeCCConfig();
+ final int coresMultiplier = 1;
+ ccConfig.setCoresMultiplier(coresMultiplier);
+ INodeManager nodeManager = new NodeManager(mockCcs(), ccConfig, resourceManager);
NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
@@ -74,6 +77,38 @@
}
@Test
+ public void testAdjustedNodeCapacity() throws HyracksException, IPCException {
+ IResourceManager resourceManager = new ResourceManager();
+ final CCConfig ccConfig = makeCCConfig();
+ final int coresMultiplier = 3;
+ ccConfig.setCoresMultiplier(coresMultiplier);
+ INodeManager nodeManager = new NodeManager(mockCcs(), ccConfig, resourceManager);
+ NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
+ NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
+
+ // verify state after adding two nodes
+ nodeManager.addNode(NODE1, ncState1);
+ nodeManager.addNode(NODE2, ncState2);
+ int activeNodes = 2;
+ // verify adjusted cores
+ Assert.assertEquals(NODE_CORES * activeNodes * coresMultiplier,
+ resourceManager.getCurrentCapacity().getAggregatedCores());
+ // verify unadjusted memory size
+ Assert.assertEquals(NODE_MEMORY_SIZE * activeNodes,
+ resourceManager.getCurrentCapacity().getAggregatedMemoryByteSize());
+ // verify state after removing a node.
+ nodeManager.removeNode(NODE1);
+ activeNodes = 1;
+ Assert.assertEquals(NODE_CORES * activeNodes * coresMultiplier,
+ resourceManager.getCurrentCapacity().getAggregatedCores());
+ Assert.assertEquals(NODE_MEMORY_SIZE * activeNodes,
+ resourceManager.getCurrentCapacity().getAggregatedMemoryByteSize());
+ // verify state after removing last node
+ nodeManager.removeNode(NODE2);
+ verifyEmptyCluster(resourceManager, nodeManager);
+ }
+
+ @Test
public void testException() throws HyracksException, IPCException {
IResourceManager resourceManager = new ResourceManager();
INodeManager nodeManager = new NodeManager(mockCcs(), makeCCConfig(), resourceManager);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index c04d5b4..470e87c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -66,7 +66,8 @@
JOB_QUEUE_CLASS(STRING, "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue"),
JOB_QUEUE_CAPACITY(INTEGER, 4096),
JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"),
- ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false);
+ ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false),
+ CORES_MULTIPLIER(INTEGER, 3);
private final IOptionType parser;
private Object defaultValue;
@@ -161,6 +162,8 @@
case ENFORCE_FRAME_WRITER_PROTOCOL:
return "A flag indicating if runtime should enforce frame writer protocol and detect "
+ "bad behaving operators";
+ case CORES_MULTIPLIER:
+ return "Specifies the multiplier to use on the cluster available cores";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -363,4 +366,12 @@
public void setEnforceFrameWriterProtocol(boolean enforce) {
configManager.set(Option.ENFORCE_FRAME_WRITER_PROTOCOL, enforce);
}
+
+ public void setCoresMultiplier(int coresMultiplier) {
+ configManager.set(Option.CORES_MULTIPLIER, coresMultiplier);
+ }
+
+ public int getCoresMultiplier() {
+ return getAppConfig().getInt(Option.CORES_MULTIPLIER);
+ }
}