[NO ISSUE] More multi-CC support, ConfigManager updates

- add ability for OptionTypes to natively parse JsonNodes
- allow all options to be overridden at the NC level, not just NC options (i.e. common, cc)
- accept controller id from the CC, avoid configuring this on NCs
- update all CCs with metadata bootstrap, not just the primary CC
- remove TxnIdFactory static singleton, management by metadata node
- remove unused build-properties style test configs
- cleanup test iodevices

Change-Id: Iff60887bf71ce3f3ed7201afd9499612bfc83485
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2344
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
index 3d69ddb..0e04dca 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
@@ -18,23 +18,32 @@
  */
 package org.apache.hyracks.api.client;
 
+import org.apache.hyracks.api.control.CcId;
+
 import java.io.Serializable;
 
 public class ClusterControllerInfo implements Serializable {
     private static final long serialVersionUID = 1L;
 
+    private final CcId ccId;
+
     private final String clientNetAddress;
 
     private final int clientNetPort;
 
     private final int webPort;
 
-    public ClusterControllerInfo(String clientNetAddress, int clientNetPort, int webPort) {
+    public ClusterControllerInfo(CcId ccId, String clientNetAddress, int clientNetPort, int webPort) {
+        this.ccId = ccId;
         this.clientNetAddress = clientNetAddress;
         this.clientNetPort = clientNetPort;
         this.webPort = webPort;
     }
 
+    public CcId getCcId() {
+        return ccId;
+    }
+
     public int getWebPort() {
         return webPort;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java
index d2a254f..aee22c9 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.api.config;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public interface IOptionType<T> {
@@ -26,6 +27,11 @@
      */
     T parse(String s);
 
+    /**
+     * @throws IllegalArgumentException when the supplied JSON node cannot be interpreted
+     */
+    T parse(JsonNode node);
+
     Class<T> targetType();
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java
index 32782fd..2a7be9d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java
@@ -45,6 +45,10 @@
         return id;
     }
 
+    public long toLongMask() {
+        return (long) id << CcIdPartitionedLongFactory.ID_BITS;
+    }
+
     @Override
     public int hashCode() {
         return id;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcIdPartitionedLongFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcIdPartitionedLongFactory.java
new file mode 100644
index 0000000..0a26494
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcIdPartitionedLongFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.control;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class CcIdPartitionedLongFactory {
+    private static final int CC_BITS = Short.SIZE;
+    public static final int ID_BITS = Long.SIZE - CC_BITS;
+    public static final long MAX_ID = (1L << ID_BITS) - 1;
+    private final CcId ccId;
+    private final AtomicLong id;
+
+    public CcIdPartitionedLongFactory(CcId ccId) {
+        this.ccId = ccId;
+        id = new AtomicLong(ccId.toLongMask());
+    }
+
+    protected long nextId() {
+        return id.getAndUpdate(prev -> {
+            if ((prev & MAX_ID) == MAX_ID) {
+                return prev ^ MAX_ID;
+            } else {
+                return prev + 1;
+            }
+        });
+    }
+
+    protected long maxId() {
+        long next = id.get();
+        if ((next & MAX_ID) == 0) {
+            return next | MAX_ID;
+        } else {
+            return next - 1;
+        }
+    }
+
+    protected void ensureMinimumId(long id) {
+        if ((id & ~MAX_ID) != ccId.toLongMask()) {
+            throw new IllegalArgumentException("cannot change ccId as part of ensureMinimumId() (was: "
+                    + Long.toHexString(this.id.get()) + ", given: " + Long.toHexString(id));
+        }
+        this.id.updateAndGet(current -> Math.max(current, id));
+    }
+
+    public CcId getCcId() {
+        return ccId;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
index c83366f..de6b5ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
@@ -22,23 +22,24 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.control.CcIdPartitionedLongFactory;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IWritable;
 
 public final class JobId implements IWritable, Serializable, Comparable {
 
-    private static final int CC_BITS = Short.SIZE;
-    static final int ID_BITS = Long.SIZE - CC_BITS;
-    static final long MAX_ID = (1L << ID_BITS) - 1;
+    private static final Pattern jobIdPattern = Pattern.compile("^JID:(\\d+)\\.(\\d+)$");
 
     public static final JobId INVALID = null;
 
     private static final long serialVersionUID = 1L;
     private long id;
-    private transient CcId ccId;
+    private transient volatile CcId ccId;
 
     public static JobId create(DataInput dis) throws IOException {
         JobId jobId = new JobId();
@@ -59,13 +60,13 @@
 
     public CcId getCcId() {
         if (ccId == null) {
-            ccId = CcId.valueOf((int) (id >>> ID_BITS));
+            ccId = CcId.valueOf((int) (id >>> CcIdPartitionedLongFactory.ID_BITS));
         }
         return ccId;
     }
 
     public long getIdOnly() {
-        return id & MAX_ID;
+        return id & CcIdPartitionedLongFactory.MAX_ID;
     }
 
     @Override
@@ -80,13 +81,17 @@
 
     @Override
     public String toString() {
-        return "JID:" + id;
+        return "JID:" + (id >>> CcIdPartitionedLongFactory.ID_BITS) + "." + getIdOnly();
     }
 
     public static JobId parse(String str) throws HyracksDataException {
-        if (str.startsWith("JID:")) {
-            str = str.substring(4);
-            return new JobId(Long.parseLong(str));
+        Matcher m = jobIdPattern.matcher(str);
+        if (m.matches()) {
+            int ccId = Integer.parseInt(m.group(1));
+            if (ccId <= 0xffff && ccId >= 0) {
+                long jobId = Long.parseLong(m.group(2)) | (long) ccId << CcIdPartitionedLongFactory.ID_BITS;
+                return new JobId(jobId);
+            }
         }
         throw HyracksDataException.create(ErrorCode.NOT_A_JOBID, str);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
index 1bb5749..528d35b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
@@ -18,36 +18,23 @@
  */
 package org.apache.hyracks.api.job;
 
-import static org.apache.hyracks.api.job.JobId.ID_BITS;
-import static org.apache.hyracks.api.job.JobId.MAX_ID;
-
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.control.CcIdPartitionedLongFactory;
 
-public class JobIdFactory {
-    private final AtomicLong id;
-
+public class JobIdFactory extends CcIdPartitionedLongFactory {
     public JobIdFactory(CcId ccId) {
-        id = new AtomicLong((long) ccId.shortValue() << ID_BITS);
+        super(ccId);
     }
 
     public JobId create() {
-        return new JobId(id.getAndUpdate(prev -> {
-            if ((prev & MAX_ID) == MAX_ID) {
-                return prev ^ MAX_ID;
-            } else {
-                return prev + 1;
-            }
-        }));
+        return new JobId(nextId());
     }
 
     public JobId maxJobId() {
-        long next = id.get();
-        if ((next & MAX_ID) == 0) {
-            return new JobId(next | MAX_ID);
-        } else {
-            return new JobId(next - 1);
-        }
+        return new JobId(maxId());
+    }
+
+    public void setMaxJobId(long maxJobId) {
+        ensureMinimumId(maxJobId + 1);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdTest.java
similarity index 80%
rename from hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdTest.java
index 709f098..d2c1d09 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdTest.java
@@ -19,23 +19,26 @@
 package org.apache.hyracks.api.job;
 
 import java.lang.reflect.Field;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.control.CcIdPartitionedLongFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-public class JobIdFactoryTest {
+public class JobIdTest {
 
     private static Field idField;
 
     @BeforeClass
     public static void setup() throws NoSuchFieldException {
-        idField = JobIdFactory.class.getDeclaredField("id");
+        idField = CcIdPartitionedLongFactory.class.getDeclaredField("id");
         idField.setAccessible(true);
     }
 
@@ -77,7 +80,7 @@
         theId.set((((long) 1 << 48) - 1) | expected);
         JobId jobId = factory.create();
         Assert.assertEquals(ccId, jobId.getCcId());
-        Assert.assertEquals(JobId.MAX_ID, jobId.getIdOnly());
+        Assert.assertEquals(CcIdPartitionedLongFactory.MAX_ID, jobId.getIdOnly());
         jobId = factory.create();
         Assert.assertEquals(ccId, jobId.getCcId());
         Assert.assertEquals(0, jobId.getIdOnly());
@@ -115,4 +118,18 @@
         } catch (IllegalArgumentException e) {
         }
     }
+
+    @Test
+    public void testParse() throws HyracksDataException {
+        for (int ccId : Arrays.asList(0xFFFF, 0, (int) Short.MAX_VALUE)) {
+            JobIdFactory factory = new JobIdFactory(CcId.valueOf(ccId));
+            for (int i = 0; i < 1000; i++) {
+                final JobId jobId = factory.create();
+                Assert.assertEquals(jobId.getId(), JobId.parse(jobId.toString()).getId());
+                Assert.assertEquals(jobId, JobId.parse(jobId.toString()));
+                Assert.assertFalse(jobId.toString(), jobId.toString().contains("-"));
+                System.err.println(jobId.toString());
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index a6edd70..f8fe77f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -215,7 +215,7 @@
         clusterIPC.start();
         clientIPC.start();
         webServer.start();
-        info = new ClusterControllerInfo(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort(),
+        info = new ClusterControllerInfo(ccId, ccConfig.getClientListenAddress(), ccConfig.getClientListenPort(),
                 webServer.getListeningPort());
         timer.schedule(sweeper, 0, ccConfig.getHeartbeatPeriodMillis());
         jobLog.open();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
index 26245e1..de166dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
@@ -42,7 +42,6 @@
 import org.apache.hyracks.control.common.application.ServiceContext;
 import org.apache.hyracks.control.common.context.ServerContext;
 import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
-import org.apache.hyracks.control.common.work.IResultCallback;
 
 public class CCServiceContext extends ServiceContext implements ICCServiceContext {
     private final ICCContext ccContext;
@@ -50,9 +49,6 @@
     protected final Set<String> initPendingNodeIds;
     protected final Set<String> deinitPendingNodeIds;
 
-    protected IResultCallback<Object> initializationCallback;
-    protected IResultCallback<Object> deinitializationCallback;
-
     private List<IJobLifecycleListener> jobLifecycleListeners;
     private List<IClusterLifecycleListener> clusterLifecycleListeners;
     private final ClusterControllerService ccs;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 77ecbee..96f5f1b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -71,6 +71,7 @@
             params.setDistributedState(ccs.getContext().getDistributedState());
             params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis());
             params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
+            params.setRegistrationId(reg.getRegistrationId());
             result = new CCNCFunctions.NodeRegistrationResult(params, null);
         } catch (Exception e) {
             LOGGER.log(Level.WARN, "Node registration failed", e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index 9cf84dd..a95ae3d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -73,6 +73,4 @@
     void getNodeControllerInfos() throws Exception;
 
     void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception;
-
-    CcId getCcId();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
index 986ca96..fd8c116 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
@@ -73,7 +73,9 @@
             new CompositeMap<>(definedMap, defaultMap, new NoOpMapMutator());
     private EnumMap<Section, Map<String, IOption>> sectionMap = new EnumMap<>(Section.class);
     @SuppressWarnings("squid:S1948") // TreeMap is serializable, and therefore so is its synchronized map
-    private Map<String, Map<IOption, Object>> nodeSpecificMap = Collections.synchronizedMap(new TreeMap<>());
+    private Map<String, Map<IOption, Object>> nodeSpecificDefinedMap = Collections.synchronizedMap(new TreeMap<>());
+    @SuppressWarnings("squid:S1948") // TreeMap is serializable, and therefore so is its synchronized map
+    private Map<String, Map<IOption, Object>> nodeSpecificDefaultMap = Collections.synchronizedMap(new TreeMap<>());
     private transient ArrayListValuedHashMap<IOption, IConfigSetter> optionSetters = new ArrayListValuedHashMap<>();
     private final String[] args;
     private ConfigManagerApplicationConfig appConfig = new ConfigManagerApplicationConfig(this);
@@ -154,26 +156,28 @@
                 }
             } else {
                 registeredOptions.add(option);
-                optionSetters.put(option,
-                        (node, value,
-                                isDefault) -> correctedMap(option.section() == Section.NC ? node : null, isDefault)
-                                        .put(option, value));
+                optionSetters.put(option, (node, value, isDefault) -> correctedMap(node, isDefault).put(option, value));
                 if (LOGGER.isDebugEnabled()) {
-                    optionSetters.put(option, (node, value, isDefault) -> LOGGER
-                            .debug((isDefault ? "defaulting" : "setting ") + option.toIniString() + " to " + value));
+                    optionSetters.put(option, (node, value, isDefault) -> LOGGER.debug("{} {} to {} for node {}",
+                            isDefault ? "defaulting" : "setting", option.toIniString(), value, node));
                 }
             }
         }
     }
 
     private Map<IOption, Object> correctedMap(String node, boolean isDefault) {
-        return node == null ? (isDefault ? defaultMap : definedMap)
-                : nodeSpecificMap.computeIfAbsent(node, this::createNodeSpecificMap);
+        if (node == null) {
+            return isDefault ? defaultMap : definedMap;
+        } else {
+            ensureNode(node);
+            return isDefault ? nodeSpecificDefaultMap.get(node) : nodeSpecificDefinedMap.get(node);
+        }
     }
 
     public void ensureNode(String nodeId) {
         LOGGER.debug("ensureNode: " + nodeId);
-        nodeSpecificMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+        nodeSpecificDefinedMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+        nodeSpecificDefaultMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
     }
 
     private Map<IOption, Object> createNodeSpecificMap(String nodeId) {
@@ -352,17 +356,13 @@
     private void applyDefaults() {
         LOGGER.debug("applying defaults");
         sectionMap.forEach((key, value) -> {
-            if (key == Section.NC) {
-                value.values().forEach(option -> getNodeNames()
-                        .forEach(node -> getOrDefault(getNodeEffectiveMap(node), option, node)));
-                for (Map.Entry<String, Map<IOption, Object>> nodeMap : nodeSpecificMap.entrySet()) {
-                    value.values()
-                            .forEach(option -> getOrDefault(
-                                    new CompositeMap<>(nodeMap.getValue(), definedMap, new NoOpMapMutator()), option,
-                                    nodeMap.getKey()));
-                }
-            } else {
-                value.values().forEach(option -> getOrDefault(configurationMap, option, null));
+            value.values().forEach(
+                    option -> getNodeNames().forEach(node -> getOrDefault(getNodeEffectiveMap(node), option, node)));
+            for (Map.Entry<String, Map<IOption, Object>> nodeMap : nodeSpecificDefinedMap.entrySet()) {
+                value.values()
+                        .forEach(option -> getOrDefault(
+                                new CompositeMap<>(nodeMap.getValue(), definedMap, new NoOpMapMutator()), option,
+                                nodeMap.getKey()));
             }
         });
     }
@@ -433,17 +433,18 @@
     }
 
     public List<String> getNodeNames() {
-        return Collections.unmodifiableList(new ArrayList<>(nodeSpecificMap.keySet()));
+        return Collections.unmodifiableList(new ArrayList<>(nodeSpecificDefinedMap.keySet()));
     }
 
     public IApplicationConfig getNodeEffectiveConfig(String nodeId) {
-        final Map<IOption, Object> nodeMap = nodeSpecificMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+        ensureNode(nodeId);
+        final Map<IOption, Object> nodeMap = nodeSpecificDefaultMap.get(nodeId);
         Map<IOption, Object> nodeEffectiveMap = getNodeEffectiveMap(nodeId);
         return new ConfigManagerApplicationConfig(this) {
             @Override
             public Object getStatic(IOption option) {
                 if (!nodeEffectiveMap.containsKey(option)) {
-                    // we need to calculate the default the the context of the node specific map...
+                    // we need to calculate the default within the context of the node specific map...
                     nodeMap.put(option, getOrDefault(nodeEffectiveMap, option, nodeId));
                 }
                 return nodeEffectiveMap.get(option);
@@ -451,8 +452,14 @@
         };
     }
 
-    private CompositeMap<IOption, Object> getNodeEffectiveMap(String nodeId) {
-        return new CompositeMap<>(nodeSpecificMap.get(nodeId), definedMap, new NoOpMapMutator());
+    private Map<IOption, Object> getNodeEffectiveMap(String nodeId) {
+        ensureNode(nodeId);
+        CompositeMap<IOption, Object> nodeEffectiveMap = new CompositeMap<>();
+        nodeEffectiveMap.setMutator(new NoOpMapMutator());
+        nodeEffectiveMap.addComposited(nodeSpecificDefinedMap.get(nodeId));
+        nodeEffectiveMap.addComposited(nodeSpecificDefaultMap.get(nodeId));
+        nodeEffectiveMap.addComposited(definedMap);
+        return nodeEffectiveMap;
     }
 
     public Ini toIni(boolean includeDefaults) {
@@ -462,8 +469,11 @@
                 ini.add(option.section().sectionName(), option.ini(), option.type().serializeToIni(value));
             }
         });
-        nodeSpecificMap.forEach((key, nodeValueMap) -> {
+        for (String key : getNodeNames()) {
             String section = Section.NC.sectionName() + "/" + key;
+            ensureNode(key);
+            Map<IOption, Object> nodeValueMap =
+                    includeDefaults ? getNodeEffectiveMap(key) : nodeSpecificDefinedMap.get(key);
             synchronized (nodeValueMap) {
                 for (Map.Entry<IOption, Object> entry : nodeValueMap.entrySet()) {
                     if (entry.getValue() != null) {
@@ -472,10 +482,9 @@
                     }
                 }
             }
-        });
-        extensionOptions.forEach((extension, options) -> {
-            options.forEach(option -> ini.add(extension, option.getKey(), option.getValue()));
-        });
+        }
+        extensionOptions.forEach((extension, options) -> options
+                .forEach(option -> ini.add(extension, option.getKey(), option.getValue())));
         return ini;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
index 3807a00..b188548 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
@@ -19,12 +19,16 @@
 package org.apache.hyracks.control.common.config;
 
 import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.config.IOptionType;
 import org.apache.hyracks.util.StorageUtil;
 import org.apache.logging.log4j.Level;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public class OptionTypes {
@@ -43,6 +47,11 @@
         }
 
         @Override
+        public Integer parse(JsonNode node) {
+            return node.isNull() ? null : parse(node.asText());
+        }
+
+        @Override
         public Class<Integer> targetType() {
             return Integer.class;
         }
@@ -65,6 +74,11 @@
         }
 
         @Override
+        public Long parse(JsonNode node) {
+            return node.isNull() ? null : parse(node.asText());
+        }
+
+        @Override
         public Class<Long> targetType() {
             return Long.class;
         }
@@ -84,13 +98,22 @@
         @Override
         public Short parse(String s) {
             int value = Integer.decode(s);
-            if (Integer.highestOneBit(value) > 16) {
-                throw new IllegalArgumentException("The given value " + s + " is too big for a short");
+            return validateShort(value);
+        }
+
+        private Short validateShort(int value) {
+            if (value > Short.MAX_VALUE || value < Short.MIN_VALUE) {
+                throw new IllegalArgumentException("The given value " + value + " does not fit in a short");
             }
             return (short) value;
         }
 
         @Override
+        public Short parse(JsonNode node) {
+            return node.isNull() ? null : validateShort(node.asInt());
+        }
+
+        @Override
         public Class<Short> targetType() {
             return Short.class;
         }
@@ -108,6 +131,11 @@
         }
 
         @Override
+        public Integer parse(JsonNode node) {
+            return node.isNull() ? null : node.asInt();
+        }
+
+        @Override
         public Class<Integer> targetType() {
             return Integer.class;
         }
@@ -125,6 +153,11 @@
         }
 
         @Override
+        public Double parse(JsonNode node) {
+            return node.isNull() ? null : node.asDouble();
+        }
+
+        @Override
         public Class<Double> targetType() {
             return Double.class;
         }
@@ -142,6 +175,11 @@
         }
 
         @Override
+        public String parse(JsonNode node) {
+            return node.isNull() ? null : node.asText();
+        }
+
+        @Override
         public Class<String> targetType() {
             return String.class;
         }
@@ -159,6 +197,11 @@
         }
 
         @Override
+        public Long parse(JsonNode node) {
+            return node.isNull() ? null : node.asLong();
+        }
+
+        @Override
         public Class<Long> targetType() {
             return Long.class;
         }
@@ -176,6 +219,11 @@
         }
 
         @Override
+        public Boolean parse(JsonNode node) {
+            return node.isNull() ? null : node.asBoolean();
+        }
+
+        @Override
         public Class<Boolean> targetType() {
             return Boolean.class;
         }
@@ -200,6 +248,11 @@
         }
 
         @Override
+        public Level parse(JsonNode node) {
+            return node.isNull() ? null : parse(node.asText());
+        }
+
+        @Override
         public Class<Level> targetType() {
             return Level.class;
         }
@@ -227,6 +280,20 @@
         }
 
         @Override
+        public String[] parse(JsonNode node) {
+            if (node.isNull()) {
+                return null;
+            }
+            List<String> strings = new ArrayList<>();
+            if (node instanceof ArrayNode) {
+                node.elements().forEachRemaining(n -> strings.add(n.asText()));
+                return strings.toArray(new String[strings.size()]);
+            } else {
+                return parse(node.asText());
+            }
+        }
+
+        @Override
         public Class<String[]> targetType() {
             return String[].class;
         }
@@ -253,6 +320,11 @@
         }
 
         @Override
+        public java.net.URL parse(JsonNode node) {
+            return node.isNull() ? null : parse(node.asText());
+        }
+
+        @Override
         public Class<java.net.URL> targetType() {
             return java.net.URL.class;
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 519bafc..75c0827 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -50,7 +50,6 @@
         NCSERVICE_PORT(INTEGER, 9090),
         CLUSTER_ADDRESS(STRING, (String) null),
         CLUSTER_PORT(INTEGER, 1099),
-        CLUSTER_CONTROLLER_ID(SHORT, (short) 0x0000),
         CLUSTER_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
         CLUSTER_PUBLIC_PORT(INTEGER, CLUSTER_LISTEN_PORT),
         NODE_ID(STRING, (String) null),
@@ -144,8 +143,6 @@
                     return "Cluster Controller port";
                 case CLUSTER_LISTEN_PORT:
                     return "IP port to bind cluster listener";
-                case CLUSTER_CONTROLLER_ID:
-                    return "16-bit (0-65535) id of the Cluster Controller";
                 case CLUSTER_PUBLIC_ADDRESS:
                     return "Public IP Address to announce cluster listener";
                 case CLUSTER_PUBLIC_PORT:
@@ -313,10 +310,6 @@
         configManager.set(nodeId, Option.CLUSTER_PORT, clusterPort);
     }
 
-    public CcId getClusterControllerId() {
-        return CcId.valueOf(appConfig.getShort(Option.CLUSTER_CONTROLLER_ID));
-    }
-
     public String getClusterListenAddress() {
         return appConfig.getString(Option.CLUSTER_LISTEN_ADDRESS);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
index bf233a8..e78a423 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
@@ -33,6 +33,8 @@
 
     private int profileDumpPeriod;
 
+    private int registrationId;
+
     public ClusterControllerInfo getClusterControllerInfo() {
         return ccInfo;
     }
@@ -64,4 +66,12 @@
     public void setProfileDumpPeriod(int profileDumpPeriod) {
         this.profileDumpPeriod = profileDumpPeriod;
     }
+
+    public int getRegistrationId() {
+        return registrationId;
+    }
+
+    public void setRegistrationId(int registrationId) {
+        this.registrationId = registrationId;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
index 75ef0b7..a87c30a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
@@ -22,6 +22,7 @@
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
@@ -72,13 +73,15 @@
 
     private final NodeCapacity capacity;
 
-    private final long maxJobId;
+    private final int registrationId;
+
+    private static final AtomicInteger nextRegistrationId = new AtomicInteger();
 
     public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
             NetworkAddress datasetPort, String osName, String arch, String osVersion, int nProcessors, String vmName,
             String vmVersion, String vmVendor, String classpath, String libraryPath, String bootClasspath,
             List<String> inputArguments, Map<String, String> systemProperties, HeartbeatSchema hbSchema,
-            NetworkAddress messagingPort, NodeCapacity capacity, int pid, long maxJobId) {
+            NetworkAddress messagingPort, NodeCapacity capacity, int pid) {
         this.ncAddress = ncAddress;
         this.nodeId = nodeId;
         this.ncConfig = ncConfig;
@@ -100,7 +103,7 @@
         this.messagingPort = messagingPort;
         this.capacity = capacity;
         this.pid = pid;
-        this.maxJobId = maxJobId;
+        this.registrationId = nextRegistrationId.getAndIncrement();
     }
 
     public InetSocketAddress getNodeControllerAddress() {
@@ -187,7 +190,7 @@
         return pid;
     }
 
-    public long getMaxJobId() {
-        return maxJobId;
+    public int getRegistrationId() {
+        return registrationId;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 0fdafe3..ae40ea3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -57,11 +57,9 @@
 
 public class ClusterControllerRemoteProxy implements IClusterController {
 
-    private final CcId ccId;
     private IIPCHandle ipcHandle;
 
-    public ClusterControllerRemoteProxy(CcId ccId, IIPCHandle ipcHandle) {
-        this.ccId = ccId;
+    public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
         this.ipcHandle = ipcHandle;
     }
 
@@ -178,12 +176,7 @@
     }
 
     @Override
-    public CcId getCcId() {
-        return ccId;
-    }
-
-    @Override
     public String toString() {
-        return getClass().getSimpleName() + " " + ccId + " [" + ipcHandle.getRemoteAddress() + "]";
+        return getClass().getSimpleName() + " [" + ipcHandle.getRemoteAddress() + "]";
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
new file mode 100644
index 0000000..63fffb4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc;
+
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.control.common.base.IClusterController;
+import org.apache.hyracks.control.common.controllers.NodeParameters;
+import org.apache.hyracks.control.common.controllers.NodeRegistration;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class CcConnection {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final IClusterController ccs;
+    private boolean registrationPending;
+    private Exception registrationException;
+    private NodeParameters nodeParameters;
+
+    CcConnection(IClusterController ccs) {
+        this.ccs = ccs;
+    }
+
+    @Override
+    public String toString() {
+        return ccs.toString();
+    }
+
+    public CcId getCcId() {
+        return getNodeParameters().getClusterControllerInfo().getCcId();
+    }
+
+    synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
+        nodeParameters = parameters;
+        registrationException = exception;
+        registrationPending = false;
+        notifyAll();
+    }
+
+    public synchronized CcId registerNode(NodeRegistration nodeRegistration) throws Exception {
+        registrationPending = true;
+        ccs.registerNode(nodeRegistration);
+        while (registrationPending) {
+            wait();
+        }
+        if (registrationException != null) {
+            LOGGER.log(Level.WARN, "Registering with {} failed with exception", this, registrationException);
+            throw registrationException;
+        }
+        return getCcId();
+    }
+
+    public IClusterController getClusterControllerService() {
+        return ccs;
+    }
+
+    public NodeParameters getNodeParameters() {
+        return nodeParameters;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
index ec8cf27..a03e0ce 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
@@ -53,7 +53,7 @@
             LOGGER.log(Level.DEBUG, "Exception parsing command line: " + Arrays.toString(args), e);
             System.exit(2);
         } catch (Exception e) {
-            LOGGER.log(Level.DEBUG, "Exiting NCDriver due to exception", e);
+            LOGGER.error("Exiting NCDriver due to exception", e);
             System.exit(1);
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0e74a4c..b1909dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -20,6 +20,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
@@ -84,6 +85,7 @@
 import org.apache.hyracks.control.nc.net.NetworkManager;
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
 import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
+import org.apache.hyracks.control.nc.work.AbortAllJobsWork;
 import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
 import org.apache.hyracks.ipc.api.IIPCEventListener;
 import org.apache.hyracks.ipc.api.IIPCHandle;
@@ -108,7 +110,7 @@
     private static final double MEMORY_FUDGE_FACTOR = 0.8;
     private static final long ONE_SECOND_NANOS = TimeUnit.SECONDS.toNanos(1);
 
-    private NCConfig ncConfig;
+    private final NCConfig ncConfig;
 
     private final String id;
 
@@ -128,13 +130,15 @@
 
     private final Timer timer;
 
-    private boolean registrationPending;
+    private CcId primaryCcId;
 
-    private Exception registrationException;
+    private final Object ccLock = new Object();
 
-    private IClusterController primaryCcs;
+    private final Map<CcId, CcConnection> ccMap = Collections.synchronizedMap(new HashMap<>());
 
-    private final Map<CcId, IClusterController> ccsMap = Collections.synchronizedMap(new HashMap<>());
+    private final Map<InetSocketAddress, CcId> ccAddressMap = Collections.synchronizedMap(new HashMap<>());
+
+    private final Map<Integer, CcConnection> pendingRegistrations = Collections.synchronizedMap(new HashMap<>());
 
     private final Map<JobId, Joblet> jobletMap;
 
@@ -144,11 +148,9 @@
 
     private ExecutorService executor;
 
-    private NodeParameters nodeParameters;
+    private Map<CcId, Thread> heartbeatThreads = new ConcurrentHashMap<>();
 
-    private Map<IClusterController, Thread> heartbeatThreads = new ConcurrentHashMap<>();
-
-    private Map<IClusterController, Timer> ccTimers = new ConcurrentHashMap<>();
+    private Map<CcId, Timer> ccTimers = new ConcurrentHashMap<>();
 
     private final ServerContext serverCtx;
 
@@ -180,9 +182,7 @@
 
     private final ConfigManager configManager;
 
-    private NodeRegistration nodeRegistration;
-
-    private final AtomicLong maxJobId = new AtomicLong(-1);
+    private final Map<CcId, AtomicLong> maxJobIds = new ConcurrentHashMap<>();
 
     static {
         ExitUtil.init();
@@ -254,7 +254,7 @@
             }
             getNodeControllerInfosAcceptor.setValue(fv);
         }
-        primaryCcs.getNodeControllerInfos();
+        getPrimaryClusterController().getNodeControllerInfos();
         return fv.get();
     }
 
@@ -302,9 +302,7 @@
             messagingNetManager.start();
         }
 
-        final InetSocketAddress ccAddress =
-                new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort());
-        this.primaryCcs = addCc(ncConfig.getClusterControllerId(), ccAddress);
+        this.primaryCcId = addCc(new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()));
 
         workQueue.start();
 
@@ -315,64 +313,81 @@
         application.startupCompleted();
     }
 
-    public ClusterControllerRemoteProxy addCc(CcId ccId, InetSocketAddress ccAddress) throws Exception {
-        ClusterControllerRemoteProxy ccProxy;
-        synchronized (ccsMap) {
-            if (ccsMap.containsKey(ccId)) {
-                throw new IllegalStateException("cc already registered: " + ccId);
+    public CcId addCc(InetSocketAddress ccAddress) throws Exception {
+        synchronized (ccLock) {
+            LOGGER.info("addCc: {}", ccAddress);
+            if (ccAddress.isUnresolved()) {
+                throw new IllegalArgumentException("must use resolved InetSocketAddress");
+            }
+            if (ccAddressMap.containsKey(ccAddress)) {
+                throw new IllegalStateException("cc already registered: " + ccAddress);
             }
             final IIPCEventListener ipcEventListener = new IIPCEventListener() {
                 @Override
                 public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
                     // we need to re-register in case of NC -> CC connection reset
                     try {
-                        registerNode(ccsMap.get(ccId));
+                        registerNode(getCcConnection(ccAddressMap.get(ccAddress)), ccAddress);
                     } catch (Exception e) {
                         LOGGER.log(Level.WARN, "Failed Registering with cc", e);
                         throw new IPCException(e);
                     }
                 }
             };
-            ccProxy = new ClusterControllerRemoteProxy(ccId,
+            ClusterControllerRemoteProxy ccProxy = new ClusterControllerRemoteProxy(
                     ipc.getHandle(ccAddress, ncConfig.getClusterConnectRetries(), 1, ipcEventListener));
-            registerNode(ccProxy);
-            ccsMap.put(ccId, ccProxy);
-        }
-        return ccProxy;
-    }
-
-    public void makePrimaryCc(CcId ccId) throws Exception {
-        synchronized (ccsMap) {
-            if (!ccsMap.containsKey(ccId)) {
-                throw new IllegalArgumentException("unknown cc: " + ccId);
-            }
-            primaryCcs = ccsMap.get(ccId);
+            CcConnection ccc = new CcConnection(ccProxy);
+            return registerNode(ccc, ccAddress);
         }
     }
 
-    public void removeCc(CcId ccId) throws Exception {
-        synchronized (ccsMap) {
-            final IClusterController ccs = ccsMap.get(ccId);
-            if (ccs == null) {
-                throw new IllegalArgumentException("unknown cc: " + ccId);
+    public void makePrimaryCc(InetSocketAddress ccAddress) throws Exception {
+        LOGGER.info("makePrimaryCc: {}", ccAddress);
+        if (ccAddress.isUnresolved()) {
+            throw new IllegalArgumentException("must use resolved InetSocketAddress");
+        }
+        CcId newPrimaryCc = ccAddressMap.get(ccAddress);
+        if (newPrimaryCc == null) {
+            throw new IllegalArgumentException("unknown cc: " + ccAddress);
+        }
+        this.primaryCcId = newPrimaryCc;
+    }
+
+    public void removeCc(InetSocketAddress ccAddress) throws Exception {
+        synchronized (ccLock) {
+            LOGGER.info("removeCc: {}", ccAddress);
+            if (ccAddress.isUnresolved()) {
+                throw new IllegalArgumentException("must use resolved InetSocketAddress");
             }
-            if (primaryCcs.equals(ccs)) {
-                throw new IllegalStateException("cannot remove primary cc: " + ccId);
+            CcId ccId = ccAddressMap.get(ccAddress);
+            if (ccId == null) {
+                LOGGER.warn("ignoring request to remove unknown cc: {}", ccAddress);
+                return;
             }
-            // TODO(mblow): consider how to handle running jobs
-            ccs.unregisterNode(id);
-            Thread hbThread = heartbeatThreads.remove(ccs);
+            if (primaryCcId.equals(ccId)) {
+                throw new IllegalStateException("cannot remove primary cc: " + ccAddress);
+            }
+            try {
+                final CcConnection ccc = getCcConnection(ccId);
+                ccc.getClusterControllerService().unregisterNode(id);
+            } catch (Exception e) {
+                LOGGER.warn("ignoring exception trying to gracefully unregister cc {}: ", () -> ccId,
+                        () -> String.valueOf(e));
+            }
+            getWorkQueue().scheduleAndSync(new AbortAllJobsWork(this, ccId));
+            Thread hbThread = heartbeatThreads.remove(ccId);
             hbThread.interrupt();
-            Timer ccTimer = ccTimers.remove(ccs);
+            Timer ccTimer = ccTimers.remove(ccId);
             if (ccTimer != null) {
                 ccTimer.cancel();
             }
+            ccMap.remove(ccId);
+            ccAddressMap.remove(ccAddress);
         }
     }
 
-    protected void registerNode(IClusterController ccs) throws Exception {
-        LOGGER.info("Registering with Cluster Controller {}", ccs);
-        registrationPending = true;
+    protected CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception {
+        LOGGER.info("Registering with Cluster Controller {}", ccc);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
             gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
@@ -389,54 +404,70 @@
         NetworkAddress netAddress = netManager.getPublicNetworkAddress();
         NetworkAddress messagingAddress =
                 messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress() : null;
-        int allCores = osMXBean.getAvailableProcessors();
-        nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, datasetAddress, osMXBean.getName(),
-                osMXBean.getArch(), osMXBean.getVersion(), allCores, runtimeMXBean.getVmName(),
-                runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(),
-                runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
-                runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress, application.getCapacity(),
-                PidHelper.getPid(), maxJobId.get());
+        NodeRegistration nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, datasetAddress,
+                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
+                runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+                runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress,
+                application.getCapacity(), PidHelper.getPid());
 
-        ccs.registerNode(nodeRegistration);
-
-        completeNodeRegistration(ccs);
+        pendingRegistrations.put(nodeRegistration.getRegistrationId(), ccc);
+        CcId ccId = ccc.registerNode(nodeRegistration);
+        ccMap.put(ccId, ccc);
+        ccAddressMap.put(ccAddress, ccId);
+        Serializable distributedState = ccc.getNodeParameters().getDistributedState();
+        if (distributedState != null) {
+            getDistributedState().put(ccId, distributedState);
+        }
+        application.onRegisterNode(ccId);
+        IClusterController ccs = ccc.getClusterControllerService();
+        NodeParameters nodeParameters = ccc.getNodeParameters();
 
         // Start heartbeat generator.
-        if (!heartbeatThreads.containsKey(ccs)) {
+        if (!heartbeatThreads.containsKey(ccId)) {
             Thread heartbeatThread =
                     new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()), id + "-Heartbeat");
             heartbeatThread.setPriority(Thread.MAX_PRIORITY);
             heartbeatThread.setDaemon(true);
             heartbeatThread.start();
-            heartbeatThreads.put(ccs, heartbeatThread);
+            heartbeatThreads.put(ccId, heartbeatThread);
         }
-        if (!ccTimers.containsKey(ccs) && nodeParameters.getProfileDumpPeriod() > 0) {
-            Timer ccTimer = new Timer("Timer-" + ccs.getCcId(), true);
+        if (!ccTimers.containsKey(ccId) && nodeParameters.getProfileDumpPeriod() > 0) {
+            Timer ccTimer = new Timer("Timer-" + ccId, true);
             // Schedule profile dump generator.
-            ccTimer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
-            ccTimers.put(ccs, ccTimer);
+            ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0, nodeParameters.getProfileDumpPeriod());
+            ccTimers.put(ccId, ccTimer);
         }
 
-        LOGGER.info("Registering with Cluster Controller {} complete", ccs);
+        LOGGER.info("Registering with Cluster Controller {} complete", ccc);
+        return ccId;
     }
 
-    synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
-        this.nodeParameters = parameters;
-        this.registrationException = exception;
-        this.registrationPending = false;
-        notifyAll();
+    void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
+        CcConnection ccc = getPendingNodeRegistration(parameters);
+        ccc.setNodeRegistrationResult(parameters, exception);
     }
 
-    private synchronized void completeNodeRegistration(IClusterController ccs) throws Exception {
-        while (registrationPending) {
-            wait();
+    private CcConnection getCcConnection(CcId ccId) {
+        CcConnection ccConnection = ccMap.get(ccId);
+        if (ccConnection == null) {
+            throw new IllegalArgumentException("unknown ccId: " + ccId);
         }
-        if (registrationException != null) {
-            LOGGER.log(Level.WARN, "Registering with Cluster Controller failed with exception", registrationException);
-            throw registrationException;
+        return ccConnection;
+    }
+
+    private CcConnection getPendingNodeRegistration(NodeParameters nodeParameters) {
+        CcConnection ccConnection = pendingRegistrations.remove(nodeParameters.getRegistrationId());
+        if (ccConnection == null) {
+            throw new IllegalStateException("Unknown pending node registration " + nodeParameters.getRegistrationId()
+                    + " for " + nodeParameters.getClusterControllerInfo().getCcId());
         }
-        serviceCtx.setDistributedState(nodeParameters.getDistributedState());
-        application.onRegisterNode(ccs.getCcId());
+        return ccConnection;
+    }
+
+    private ConcurrentHashMap<CcId, Serializable> getDistributedState() {
+        //noinspection unchecked
+        return (ConcurrentHashMap<CcId, Serializable>) serviceCtx.getDistributedState();
     }
 
     private void startApplication() throws Exception {
@@ -448,7 +479,12 @@
     }
 
     public void updateMaxJobId(JobId jobId) {
-        maxJobId.getAndUpdate(currentMaxId -> Math.max(currentMaxId, jobId.getId()));
+        maxJobIds.computeIfAbsent(jobId.getCcId(), key -> new AtomicLong())
+                .getAndUpdate(currentMaxId -> Math.max(currentMaxId, jobId.getId()));
+    }
+
+    public long getMaxJobId(CcId ccId) {
+        return maxJobIds.computeIfAbsent(ccId, key -> new AtomicLong(ccId.toLongMask())).get();
     }
 
     @Override
@@ -478,10 +514,10 @@
                 t.interrupt();
                 InvokeUtil.doUninterruptibly(() -> t.join(1000));
             });
-            synchronized (ccsMap) {
-                ccsMap.values().parallelStream().forEach(ccs -> {
+            synchronized (ccLock) {
+                ccMap.values().parallelStream().forEach(cc -> {
                     try {
-                        ccs.notifyShutdown(id);
+                        cc.getClusterControllerService().notifyShutdown(id);
                     } catch (Exception e) {
                         LOGGER.log(Level.WARN, "Exception notifying CC of shutdown", e);
                     }
@@ -520,13 +556,8 @@
         jobParameterByteStoreMap.remove(jobId);
     }
 
-    public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) throws HyracksException {
-        JobParameterByteStore jpbs = jobParameterByteStoreMap.get(jobId);
-        if (jpbs == null) {
-            jpbs = new JobParameterByteStore();
-            jobParameterByteStoreMap.put(jobId, jpbs);
-        }
-        return jpbs;
+    public JobParameterByteStore createOrGetJobParameterByteStore(JobId jobId) {
+        return jobParameterByteStoreMap.computeIfAbsent(jobId, jid -> new JobParameterByteStore());
     }
 
     public void storeActivityClusterGraph(DeployedJobSpecId deployedJobSpecId, ActivityClusterGraph acg)
@@ -550,7 +581,7 @@
         }
     }
 
-    public ActivityClusterGraph getActivityClusterGraph(DeployedJobSpecId deployedJobSpecId) throws HyracksException {
+    public ActivityClusterGraph getActivityClusterGraph(DeployedJobSpecId deployedJobSpecId) {
         return deployedJobSpecActivityClusterGraphMap.get(deployedJobSpecId.getId());
     }
 
@@ -566,16 +597,21 @@
         return partitionManager;
     }
 
+    public CcId getPrimaryCcId() {
+        // TODO(mblow): this can change at any time, need notification framework
+        return primaryCcId;
+    }
+
     public IClusterController getPrimaryClusterController() {
-        return primaryCcs;
+        return getClusterController(primaryCcId);
     }
 
     public IClusterController getClusterController(CcId ccId) {
-        return ccsMap.get(ccId);
+        return getCcConnection(ccId).getClusterControllerService();
     }
 
-    public NodeParameters getNodeParameters() {
-        return nodeParameters;
+    public NodeParameters getNodeParameters(CcId ccId) {
+        return getCcConnection(ccId).getNodeParameters();
     }
 
     @Override
@@ -691,17 +727,19 @@
     }
 
     private class ProfileDumpTask extends TimerTask {
-        private IClusterController cc;
+        private final IClusterController cc;
+        private final CcId ccId;
 
-        public ProfileDumpTask(IClusterController cc) {
+        public ProfileDumpTask(IClusterController cc, CcId ccId) {
             this.cc = cc;
+            this.ccId = ccId;
         }
 
         @Override
         public void run() {
             try {
                 FutureValue<List<JobProfile>> fv = new FutureValue<>();
-                BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, cc.getCcId(), fv);
+                BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, ccId, fv);
                 workQueue.scheduleAndSync(bjpw);
                 List<JobProfile> profiles = fv.get();
                 if (!profiles.isEmpty()) {
@@ -734,7 +772,7 @@
     }
 
     public void sendApplicationMessageToCC(CcId ccId, byte[] data, DeploymentId deploymentId) throws Exception {
-        ccsMap.get(ccId).sendApplicationMessageToCC(data, deploymentId, id);
+        getClusterController(ccId).sendApplicationMessageToCC(data, deploymentId, id);
     }
 
     public IDatasetPartitionManager getDatasetPartitionManager() {
@@ -759,4 +797,5 @@
     public Object getApplicationContext() {
         return application.getApplicationContext();
     }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
index 6a75471..87330226 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IStateDumpHandler;
@@ -50,7 +51,7 @@
 
     public NCServiceContext(NodeControllerService ncs, ServerContext serverCtx, IOManager ioManager, String nodeId,
             MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager,
-            IApplicationConfig appConfig) throws IOException {
+            IApplicationConfig appConfig) {
         super(serverCtx, appConfig, new HyracksThreadFactory(nodeId));
         this.lccm = lifeCyclecomponentManager;
         this.nodeId = nodeId;
@@ -59,6 +60,7 @@
         this.ncs = ncs;
         this.sdh = lccm::dumpState;
         this.tracer = new Tracer(nodeId, ncs.getConfiguration().getTraceCategories(), new TraceCategoryRegistry());
+        this.distributedState = new ConcurrentHashMap<>();
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 54a171d..2742aaa 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -41,6 +41,7 @@
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.util.file.FileUtil;
 
 public class IOManager implements IIOManager {
     /*
@@ -72,7 +73,11 @@
         workspaces = new ArrayList<>();
         for (IODeviceHandle d : ioDevices) {
             if (d.getWorkspace() != null) {
-                new File(d.getMount(), d.getWorkspace()).mkdirs();
+                try {
+                    FileUtil.forceMkdirs(new File(d.getMount(), d.getWorkspace()));
+                } catch (IOException e) {
+                    throw HyracksDataException.create(e);
+                }
                 workspaces.add(d);
             }
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/pom.xml b/hyracks-fullstack/hyracks/hyracks-util/pom.xml
index c521f08..8de30ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-util/pom.xml
@@ -79,6 +79,10 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-core</artifactId>
     </dependency>
+      <dependency>
+          <groupId>commons-io</groupId>
+          <artifactId>commons-io</artifactId>
+      </dependency>
   </dependencies>
 
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
index d6e175e..1b0093d 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
@@ -19,10 +19,18 @@
 package org.apache.hyracks.util.file;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.regex.Pattern;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 public class FileUtil {
 
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final Object LOCK = new Object();
+
     private FileUtil() {
     }
 
@@ -30,6 +38,19 @@
         return joinPath(File.separatorChar, elements);
     }
 
+    public static void forceMkdirs(File dir) throws IOException {
+        File canonicalDir = dir.getCanonicalFile();
+        try {
+            FileUtils.forceMkdir(canonicalDir);
+        } catch (IOException e) {
+            LOGGER.warn("failure to create directory {}, retrying", dir, e);
+            synchronized (LOCK) {
+                FileUtils.forceMkdir(canonicalDir);
+            }
+        }
+
+    }
+
     static String joinPath(char separatorChar, String... elements) {
         final String separator = String.valueOf(separatorChar);
         final String escapedSeparator = Pattern.quote(separator);