[ASTERIXDB-2268][CONF] Add Cores Multiplier

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Add configurable cores multiplier to CC
  config and default it to 3. The multiplier
  will be used to adjust nodes cores capacity.
- Add test case for adjusted node capacity.

Change-Id: I95dd6e0c1add92e70e667321e8ef5f9b9887cda5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2326
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 712d2ec..367a1d5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -59,6 +59,7 @@
     private final IResourceManager resourceManager;
     private final Map<String, NodeControllerState> nodeRegistry;
     private final Map<InetAddress, Set<String>> ipAddressNodeNameMap;
+    private final int nodeCoresMultiplier;
 
     public NodeManager(ClusterControllerService ccs, CCConfig ccConfig, IResourceManager resourceManager) {
         this.ccs = ccs;
@@ -66,6 +67,7 @@
         this.resourceManager = resourceManager;
         this.nodeRegistry = new LinkedHashMap<>();
         this.ipAddressNodeNameMap = new HashMap<>();
+        this.nodeCoresMultiplier = ccConfig.getCoresMultiplier();
     }
 
     @Override
@@ -122,7 +124,7 @@
         }
         // Updates the cluster capacity.
         LOGGER.warn("updating cluster capacity");
-        resourceManager.update(nodeId, ncState.getCapacity());
+        resourceManager.update(nodeId, getAdjustedNodeCapacity(ncState.getCapacity()));
     }
 
     @Override
@@ -218,4 +220,7 @@
         }
     }
 
+    private NodeCapacity getAdjustedNodeCapacity(NodeCapacity nodeCapacity) {
+        return new NodeCapacity(nodeCapacity.getMemoryByteSize(), nodeCapacity.getCores() * nodeCoresMultiplier);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index 9b9a3b4..931e436 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -51,7 +51,10 @@
     @Test
     public void testNormal() throws HyracksException, IPCException {
         IResourceManager resourceManager = new ResourceManager();
-        INodeManager nodeManager = new NodeManager(mockCcs(), makeCCConfig(), resourceManager);
+        final CCConfig ccConfig = makeCCConfig();
+        final int coresMultiplier = 1;
+        ccConfig.setCoresMultiplier(coresMultiplier);
+        INodeManager nodeManager = new NodeManager(mockCcs(), ccConfig, resourceManager);
         NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
         NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
 
@@ -74,6 +77,38 @@
     }
 
     @Test
+    public void testAdjustedNodeCapacity() throws HyracksException, IPCException {
+        IResourceManager resourceManager = new ResourceManager();
+        final CCConfig ccConfig = makeCCConfig();
+        final int coresMultiplier = 3;
+        ccConfig.setCoresMultiplier(coresMultiplier);
+        INodeManager nodeManager = new NodeManager(mockCcs(), ccConfig, resourceManager);
+        NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
+        NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
+
+        // verify state after adding two nodes
+        nodeManager.addNode(NODE1, ncState1);
+        nodeManager.addNode(NODE2, ncState2);
+        int activeNodes = 2;
+        // verify adjusted cores
+        Assert.assertEquals(NODE_CORES * activeNodes * coresMultiplier,
+                resourceManager.getCurrentCapacity().getAggregatedCores());
+        // verify unadjusted memory size
+        Assert.assertEquals(NODE_MEMORY_SIZE * activeNodes,
+                resourceManager.getCurrentCapacity().getAggregatedMemoryByteSize());
+        // verify state after removing a node.
+        nodeManager.removeNode(NODE1);
+        activeNodes = 1;
+        Assert.assertEquals(NODE_CORES * activeNodes * coresMultiplier,
+                resourceManager.getCurrentCapacity().getAggregatedCores());
+        Assert.assertEquals(NODE_MEMORY_SIZE * activeNodes,
+                resourceManager.getCurrentCapacity().getAggregatedMemoryByteSize());
+        // verify state after removing last node
+        nodeManager.removeNode(NODE2);
+        verifyEmptyCluster(resourceManager, nodeManager);
+    }
+
+    @Test
     public void testException() throws HyracksException, IPCException {
         IResourceManager resourceManager = new ResourceManager();
         INodeManager nodeManager = new NodeManager(mockCcs(), makeCCConfig(), resourceManager);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index c04d5b4..470e87c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -66,7 +66,8 @@
         JOB_QUEUE_CLASS(STRING, "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue"),
         JOB_QUEUE_CAPACITY(INTEGER, 4096),
         JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"),
-        ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false);
+        ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false),
+        CORES_MULTIPLIER(INTEGER, 3);
 
         private final IOptionType parser;
         private Object defaultValue;
@@ -161,6 +162,8 @@
                 case ENFORCE_FRAME_WRITER_PROTOCOL:
                     return "A flag indicating if runtime should enforce frame writer protocol and detect "
                             + "bad behaving operators";
+                case CORES_MULTIPLIER:
+                    return "Specifies the multiplier to use on the cluster available cores";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -363,4 +366,12 @@
     public void setEnforceFrameWriterProtocol(boolean enforce) {
         configManager.set(Option.ENFORCE_FRAME_WRITER_PROTOCOL, enforce);
     }
+
+    public void setCoresMultiplier(int coresMultiplier) {
+        configManager.set(Option.CORES_MULTIPLIER, coresMultiplier);
+    }
+
+    public int getCoresMultiplier() {
+        return getAppConfig().getInt(Option.CORES_MULTIPLIER);
+    }
 }