[NO ISSUE][TX] Ensure TxnIdFactory Value is Initialized

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Report local max txn id after node registration.
- Add node status BOOTING.
- Distinguish between node first time registration and
  registration after restarting by using NodeStatus
  BOOTING to respond with the proper node post
  registration tasks.
- Rename node status ALIVE -> ACTIVE.
- Rename StartupTask* to RegistrationTasks*

Change-Id: I6899c9e7d6e744ca92d0108556e086a23639d78b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2151
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 7b08f68..e77d535 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -481,7 +481,9 @@
 
     @Override
     public synchronized void unexportMetadataNodeStub() throws RemoteException {
-        UnicastRemoteObject.unexportObject(MetadataNode.INSTANCE, false);
+        if (metadataNodeStub != null) {
+            UnicastRemoteObject.unexportObject(MetadataNode.INSTANCE, false);
+        }
         metadataNodeStub = null;
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
similarity index 87%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
index 22d3cde..86f7d1c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
@@ -19,18 +19,18 @@
 package org.apache.asterix.app.nc.task;
 
 import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage;
+import org.apache.asterix.runtime.message.ReportLocalCountersMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class ReportMaxResourceIdTask implements INCLifecycleTask {
+public class ReportLocalCountersTask implements INCLifecycleTask {
 
     private static final long serialVersionUID = 1L;
 
     @Override
     public void perform(IControllerService cs) throws HyracksDataException {
-        ReportMaxResourceIdMessage.send((NodeControllerService) cs);
+        ReportLocalCountersMessage.send((NodeControllerService) cs);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
index 4ac1305..23f225e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
@@ -33,7 +33,7 @@
 import org.apache.asterix.app.nc.task.CheckpointTask;
 import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
 import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
-import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask;
+import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
 import org.apache.asterix.app.nc.task.StartFailbackTask;
 import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
 import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
@@ -43,8 +43,8 @@
 import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
 import org.apache.asterix.app.replication.message.PreparePartitionsFailbackRequestMessage;
 import org.apache.asterix.app.replication.message.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
-import org.apache.asterix.app.replication.message.StartupTaskResponseMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
 import org.apache.asterix.app.replication.message.TakeoverMetadataNodeRequestMessage;
 import org.apache.asterix.app.replication.message.TakeoverMetadataNodeResponseMessage;
 import org.apache.asterix.app.replication.message.TakeoverPartitionsRequestMessage;
@@ -431,10 +431,10 @@
     @Override
     public synchronized void process(INCLifecycleMessage message) throws HyracksDataException {
         switch (message.getType()) {
-            case STARTUP_TASK_REQUEST:
-                process((StartupTaskRequestMessage) message);
+            case REGISTRATION_TASKS_REQUEST:
+                process((RegistrationTasksRequestMessage) message);
                 break;
-            case STARTUP_TASK_RESULT:
+            case REGISTRATION_TASKS_RESULT:
                 process((NCLifecycleTaskReportMessage) message);
                 break;
             case TAKEOVER_PARTITION_RESPONSE:
@@ -483,7 +483,7 @@
         currentMetadataNode = clusterManager.getCurrentMetadataNodeId();
     }
 
-    private synchronized void process(StartupTaskRequestMessage msg) throws HyracksDataException {
+    private synchronized void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
         final String nodeId = msg.getNodeId();
         final SystemState state = msg.getState();
         List<INCLifecycleTask> tasks;
@@ -493,7 +493,7 @@
             // failed node returned. Need to start failback process
             tasks = buildFailbackStartupSequence();
         }
-        StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks);
+        RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
         try {
             messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
         } catch (Exception e) {
@@ -504,7 +504,7 @@
     private List<INCLifecycleTask> buildFailbackStartupSequence() {
         final List<INCLifecycleTask> tasks = new ArrayList<>();
         tasks.add(new StartFailbackTask());
-        tasks.add(new ReportMaxResourceIdTask());
+        tasks.add(new ReportLocalCountersTask());
         tasks.add(new StartLifecycleComponentsTask());
         return tasks;
     }
@@ -517,7 +517,7 @@
             tasks.add(new MetadataBootstrapTask());
         }
         tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
-        tasks.add(new ReportMaxResourceIdTask());
+        tasks.add(new ReportLocalCountersTask());
         tasks.add(new CheckpointTask());
         tasks.add(new StartLifecycleComponentsTask());
         if (isMetadataNode) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
index 1b57403..3341813 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
@@ -35,14 +35,14 @@
 import org.apache.asterix.app.nc.task.LocalRecoveryTask;
 import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
 import org.apache.asterix.app.nc.task.RemoteRecoveryTask;
-import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask;
+import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
 import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
 import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
 import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
 import org.apache.asterix.app.replication.message.ReplayPartitionLogsRequestMessage;
 import org.apache.asterix.app.replication.message.ReplayPartitionLogsResponseMessage;
-import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
-import org.apache.asterix.app.replication.message.StartupTaskResponseMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
@@ -123,10 +123,10 @@
     @Override
     public synchronized void process(INCLifecycleMessage message) throws HyracksDataException {
         switch (message.getType()) {
-            case STARTUP_TASK_REQUEST:
-                process((StartupTaskRequestMessage) message);
+            case REGISTRATION_TASKS_REQUEST:
+                process((RegistrationTasksRequestMessage) message);
                 break;
-            case STARTUP_TASK_RESULT:
+            case REGISTRATION_TASKS_RESULT:
                 process((NCLifecycleTaskReportMessage) message);
                 break;
             case REPLAY_LOGS_RESPONSE:
@@ -150,7 +150,7 @@
         }
     }
 
-    private synchronized void process(StartupTaskRequestMessage msg) throws HyracksDataException {
+    private synchronized void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
         final String nodeId = msg.getNodeId();
         final SystemState state = msg.getState();
         final boolean isParticipant = replicationStrategy.isParticipant(nodeId);
@@ -160,7 +160,7 @@
         } else {
             tasks = buildParticipantStartupSequence(nodeId, state);
         }
-        StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks);
+        RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
         try {
             messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
         } catch (Exception e) {
@@ -199,7 +199,7 @@
             tasks.add(rt);
         }
         tasks.add(new ExternalLibrarySetupTask(false));
-        tasks.add(new ReportMaxResourceIdTask());
+        tasks.add(new ReportLocalCountersTask());
         tasks.add(new CheckpointTask());
         tasks.add(new StartLifecycleComponentsTask());
         return tasks;
@@ -234,7 +234,7 @@
             tasks.add(new MetadataBootstrapTask());
         }
         tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
-        tasks.add(new ReportMaxResourceIdTask());
+        tasks.add(new ReportLocalCountersTask());
         tasks.add(new CheckpointTask());
         tasks.add(new StartLifecycleComponentsTask());
         if (isMetadataNode) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
index b9ea135..a273845 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
@@ -32,11 +32,11 @@
 import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
 import org.apache.asterix.app.nc.task.LocalRecoveryTask;
 import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
-import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask;
+import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
 import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
 import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
-import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
-import org.apache.asterix.app.replication.message.StartupTaskResponseMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
@@ -48,12 +48,13 @@
 import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
 
     private static final Logger LOGGER = Logger.getLogger(NoFaultToleranceStrategy.class.getName());
-    IClusterStateManager clusterManager;
+    private IClusterStateManager clusterManager;
     private String metadataNodeId;
     private Set<String> pendingStartupCompletionNodes = new HashSet<>();
     private ICCMessageBroker messageBroker;
@@ -76,10 +77,10 @@
     @Override
     public void process(INCLifecycleMessage message) throws HyracksDataException {
         switch (message.getType()) {
-            case STARTUP_TASK_REQUEST:
-                process((StartupTaskRequestMessage) message);
+            case REGISTRATION_TASKS_REQUEST:
+                process((RegistrationTasksRequestMessage) message);
                 break;
-            case STARTUP_TASK_RESULT:
+            case REGISTRATION_TASKS_RESULT:
                 process((NCLifecycleTaskReportMessage) message);
                 break;
             default:
@@ -100,10 +101,10 @@
         metadataNodeId = clusterManager.getCurrentMetadataNodeId();
     }
 
-    private void process(StartupTaskRequestMessage msg) throws HyracksDataException {
+    private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
         final String nodeId = msg.getNodeId();
-        List<INCLifecycleTask> tasks = buildNCStartupSequence(msg.getNodeId(), msg.getState());
-        StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks);
+        List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState());
+        RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
         try {
             messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
         } catch (Exception e) {
@@ -126,7 +127,16 @@
         }
     }
 
-    private List<INCLifecycleTask> buildNCStartupSequence(String nodeId, SystemState state) {
+    private List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) {
+        LOGGER.log(Level.INFO, () -> "Building registration tasks for node: " + nodeId + " with state: " + state);
+        final boolean isMetadataNode = nodeId.equals(metadataNodeId);
+        if (nodeStatus == NodeStatus.ACTIVE) {
+            /*
+             * if the node state is already ACTIVE then it completed
+             * booting and just re-registering with a new/failed CC.
+             */
+            return buildActiveNCRegTasks(isMetadataNode);
+        }
         final List<INCLifecycleTask> tasks = new ArrayList<>();
         if (state == SystemState.CORRUPTED) {
             //need to perform local recovery for node partitions
@@ -134,12 +144,11 @@
                     .stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
             tasks.add(rt);
         }
-        final boolean isMetadataNode = nodeId.equals(metadataNodeId);
         if (isMetadataNode) {
             tasks.add(new MetadataBootstrapTask());
         }
         tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
-        tasks.add(new ReportMaxResourceIdTask());
+        tasks.add(new ReportLocalCountersTask());
         tasks.add(new CheckpointTask());
         tasks.add(new StartLifecycleComponentsTask());
         if (isMetadataNode) {
@@ -147,4 +156,15 @@
         }
         return tasks;
     }
+
+    private List<INCLifecycleTask> buildActiveNCRegTasks(boolean metadataNode) {
+        final List<INCLifecycleTask> tasks = new ArrayList<>();
+        if (metadataNode) {
+            // need to unbind from old distributed state then rebind to new one
+            tasks.add(new BindMetadataNodeTask(false));
+            tasks.add(new BindMetadataNodeTask(true));
+        }
+        tasks.add(new ReportLocalCountersTask());
+        return tasks;
+    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 2b32e1f..b654fd8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -58,6 +58,6 @@
 
     @Override
     public MessageType getType() {
-        return MessageType.STARTUP_TASK_RESULT;
+        return MessageType.REGISTRATION_TASKS_RESULT;
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
similarity index 69%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index 21dee9c..075c415 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -26,27 +26,32 @@
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class StartupTaskRequestMessage implements INCLifecycleMessage, ICcAddressedMessage {
+public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICcAddressedMessage {
 
-    private static final Logger LOGGER = Logger.getLogger(StartupTaskRequestMessage.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(RegistrationTasksRequestMessage.class.getName());
     private static final long serialVersionUID = 1L;
     private final SystemState state;
     private final String nodeId;
+    private final NodeStatus nodeStatus;
 
-    public StartupTaskRequestMessage(String nodeId, SystemState state) {
+    public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state) {
         this.state = state;
         this.nodeId = nodeId;
+        this.nodeStatus = nodeStatus;
     }
 
-    public static void send(NodeControllerService cs, SystemState systemState) throws HyracksDataException {
+    public static void send(NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState)
+            throws HyracksDataException {
         try {
-            StartupTaskRequestMessage msg = new StartupTaskRequestMessage(cs.getId(), systemState);
+            RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
+                    systemState);
             ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(msg);
         } catch (Exception e) {
-            LOGGER.log(Level.SEVERE, "Unable to send StartupTaskRequestMessage to CC", e);
+            LOGGER.log(Level.SEVERE, "Unable to send RegistrationTasksRequestMessage to CC", e);
             throw HyracksDataException.create(e);
         }
     }
@@ -64,8 +69,13 @@
         return nodeId;
     }
 
+    public NodeStatus getNodeStatus() {
+        return nodeStatus;
+    }
+
     @Override
     public MessageType getType() {
-        return MessageType.STARTUP_TASK_REQUEST;
+        return MessageType.REGISTRATION_TASKS_REQUEST;
     }
+
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
similarity index 90%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index b941343..13525e3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -32,14 +32,14 @@
 import org.apache.hyracks.control.nc.NCShutdownHook;
 import org.apache.hyracks.util.ExitUtil;
 
-public class StartupTaskResponseMessage implements INCLifecycleMessage, INcAddressedMessage {
+public class RegistrationTasksResponseMessage implements INCLifecycleMessage, INcAddressedMessage {
 
-    private static final Logger LOGGER = Logger.getLogger(StartupTaskResponseMessage.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(RegistrationTasksResponseMessage.class.getName());
     private static final long serialVersionUID = 1L;
     private final String nodeId;
     private final List<INCLifecycleTask> tasks;
 
-    public StartupTaskResponseMessage(String nodeId, List<INCLifecycleTask> tasks) {
+    public RegistrationTasksResponseMessage(String nodeId, List<INCLifecycleTask> tasks) {
         this.nodeId = nodeId;
         this.tasks = tasks;
     }
@@ -88,6 +88,6 @@
 
     @Override
     public MessageType getType() {
-        return MessageType.STARTUP_TASK_RESPONSE;
+        return MessageType.REGISTRATION_TASKS_RESPONSE;
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 3d7f870..a18535d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -26,7 +26,7 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
-import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.config.AsterixExtension;
@@ -48,6 +48,7 @@
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
@@ -67,7 +68,6 @@
     private String nodeId;
     private boolean stopInitiated;
     private boolean startupCompleted;
-    private SystemState systemState;
     protected WebManager webManager;
 
     @Override
@@ -117,9 +117,8 @@
         this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory);
 
         IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
-        systemState = recoveryMgr.getSystemState();
-
-        if (systemState == SystemState.PERMANENT_DATA_LOSS) {
+        final SystemState stateOnStartup = recoveryMgr.getSystemState();
+        if (stateOnStartup == SystemState.PERMANENT_DATA_LOSS) {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("System state: " + SystemState.PERMANENT_DATA_LOSS);
                 LOGGER.info("Node ID: " + nodeId);
@@ -187,20 +186,27 @@
 
         // Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag
         final NodeProperties nodeProperties = runtimeContext.getNodeProperties();
-        if (systemState == SystemState.PERMANENT_DATA_LOSS
-                && (nodeProperties.isInitialRun() || nodeProperties.isVirtualNc())) {
-            systemState = SystemState.BOOTSTRAPPING;
+        IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
+        SystemState state = recoveryMgr.getSystemState();
+        if (state == SystemState.PERMANENT_DATA_LOSS && (nodeProperties.isInitialRun() || nodeProperties
+                .isVirtualNc())) {
+            state = SystemState.BOOTSTRAPPING;
         }
-        // Request startup tasks from CC
-        StartupTaskRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), systemState);
+        // Request registration tasks from CC
+        RegistrationTasksRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(),
+                NodeStatus.BOOTING, state);
         startupCompleted = true;
     }
 
     @Override
     public void onRegisterNode() throws Exception {
         if (startupCompleted) {
-            // Request startup tasks from CC
-            StartupTaskRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), systemState);
+            /*
+             * If the node completed its startup before, then this is a re-registration with
+             * the CC and therefore the system state should be HEALTHY and the node status is ACTIVE
+             */
+            RegistrationTasksRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(),
+                    NodeStatus.ACTIVE, SystemState.HEALTHY);
         }
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
index 87b0856..cb9fa8f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
@@ -22,16 +22,16 @@
 
 public interface INCLifecycleMessage extends IMessage {
 
-    public enum MessageType {
+    enum MessageType {
         REPLAY_LOGS_REQUEST,
         REPLAY_LOGS_RESPONSE,
         PREPARE_FAILBACK_REQUEST,
         PREPARE_FAILBACK_RESPONSE,
         COMPLETE_FAILBACK_REQUEST,
         COMPLETE_FAILBACK_RESPONSE,
-        STARTUP_TASK_REQUEST,
-        STARTUP_TASK_RESPONSE,
-        STARTUP_TASK_RESULT,
+        REGISTRATION_TASKS_REQUEST,
+        REGISTRATION_TASKS_RESPONSE,
+        REGISTRATION_TASKS_RESULT,
         TAKEOVER_PARTITION_REQUEST,
         TAKEOVER_PARTITION_RESPONSE,
         TAKEOVER_METADATA_NODE_REQUEST,
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
similarity index 76%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
index 277c0ba..3f8ced8 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
@@ -27,27 +27,27 @@
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class ReportMaxResourceIdMessage implements ICcAddressedMessage {
+public class ReportLocalCountersMessage implements ICcAddressedMessage {
     private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = Logger.getLogger(ReportMaxResourceIdMessage.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(ReportLocalCountersMessage.class.getName());
     private final long maxResourceId;
+    private final long maxTxnId;
     private final String src;
 
-    public ReportMaxResourceIdMessage(String src, long maxResourceId) {
+    public ReportLocalCountersMessage(String src, long maxResourceId, long maxTxnId) {
         this.src = src;
         this.maxResourceId = maxResourceId;
-    }
-
-    public long getMaxResourceId() {
-        return maxResourceId;
+        this.maxTxnId = maxTxnId;
     }
 
     @Override
     public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
         IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
+        TxnIdFactory.ensureMinimumId(maxTxnId);
         resourceIdManager.report(src, maxResourceId);
     }
 
@@ -56,17 +56,19 @@
         INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext();
         long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
                 MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
-        ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage(ncs.getId(), maxResourceId);
+        long maxTxnId = appContext.getTransactionSubsystem().getTransactionManager().getMaxTxnId();
+        ReportLocalCountersMessage countersMessage =
+                new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId);
         try {
-            ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg);
+            ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(countersMessage);
         } catch (Exception e) {
-            LOGGER.log(Level.SEVERE, "Unable to report max local resource id", e);
+            LOGGER.log(Level.SEVERE, "Unable to report local counters", e);
             throw HyracksDataException.create(e);
         }
     }
 
     @Override
     public String toString() {
-        return ReportMaxResourceIdMessage.class.getSimpleName();
+        return ReportLocalCountersMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
similarity index 87%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
index a43376d..785ad2f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
@@ -23,16 +23,16 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class ReportMaxResourceIdRequestMessage implements INcAddressedMessage {
+public class ReportLocalCountersRequestMessage implements INcAddressedMessage {
     private static final long serialVersionUID = 1L;
 
     @Override
     public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
-        ReportMaxResourceIdMessage.send((NodeControllerService) appCtx.getServiceContext().getControllerService());
+        ReportLocalCountersMessage.send((NodeControllerService) appCtx.getServiceContext().getControllerService());
     }
 
     @Override
     public String toString() {
-        return ReportMaxResourceIdRequestMessage.class.getSimpleName();
+        return ReportLocalCountersRequestMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index decc1a9..a2f4aa1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -61,7 +61,7 @@
     private void requestMaxResourceID(IClusterStateManager clusterStateManager, IResourceIdManager resourceIdManager,
             ICCMessageBroker broker) throws Exception {
         Set<String> getParticipantNodes = clusterStateManager.getParticipantNodes();
-        ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage();
+        ReportLocalCountersRequestMessage msg = new ReportLocalCountersRequestMessage();
         for (String nodeId : getParticipantNodes) {
             if (!resourceIdManager.reported(nodeId)) {
                 broker.sendApplicationMessageToNC(msg, nodeId);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java
index 71d7f56..eb59e74 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java
@@ -37,6 +37,6 @@
     }
 
     public static void ensureMinimumId(long id) {
-        TxnIdFactory.id.set(id);
+        TxnIdFactory.id.updateAndGet(current -> Math.max(current, id));
     }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java
index b84f1f2..10a9a3c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.api.client;
 
 public enum NodeStatus {
-    ALIVE,
+    ACTIVE,
+    BOOTING,
     DEAD
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 3cd6235..4928564 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -136,7 +136,7 @@
     public Map<String, NodeControllerInfo> getNodeControllerInfoMap() {
         Map<String, NodeControllerInfo> result = new LinkedHashMap<>();
         nodeRegistry.forEach(
-                (key, ncState) -> result.put(key, new NodeControllerInfo(key, NodeStatus.ALIVE, ncState.getDataPort(),
+                (key, ncState) -> result.put(key, new NodeControllerInfo(key, NodeStatus.ACTIVE, ncState.getDataPort(),
                         ncState.getDatasetPort(), ncState.getMessagingPort(), ncState.getCapacity().getCores())));
         return result;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
index 445a15c..bb28c79 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -97,10 +97,10 @@
         Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(4, "nc", "10.0.0.",
                 dataPort, resultPort, messagingPort);
         ncNameToNcInfos.put("nc7",
-                new NodeControllerInfo("nc7", NodeStatus.ALIVE, new NetworkAddress("10.0.0.7", dataPort),
+                new NodeControllerInfo("nc7", NodeStatus.ACTIVE, new NetworkAddress("10.0.0.7", dataPort),
                         new NetworkAddress("10.0.0.5", resultPort), new NetworkAddress("10.0.0.5", messagingPort), 2));
         ncNameToNcInfos.put("nc12",
-                new NodeControllerInfo("nc12", NodeStatus.ALIVE, new NetworkAddress("10.0.0.12", dataPort),
+                new NodeControllerInfo("nc12", NodeStatus.ACTIVE, new NetworkAddress("10.0.0.12", dataPort),
                         new NetworkAddress("10.0.0.5", resultPort), new NetworkAddress("10.0.0.5", messagingPort), 2));
 
         InputSplit[] fileSplits = new InputSplit[12];
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index c3d86e8..1814e85 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -135,7 +135,7 @@
             String ncId = ncNamePrefix + i;
             String ncAddress = addressPrefix + i;
             ncNameToNcInfos.put(ncId,
-                    new NodeControllerInfo(ncId, NodeStatus.ALIVE, new NetworkAddress(ncAddress, netPort),
+                    new NodeControllerInfo(ncId, NodeStatus.ACTIVE, new NetworkAddress(ncAddress, netPort),
                             new NetworkAddress(ncAddress, dataPort), new NetworkAddress(ncAddress, messagingPort), 2));
         }
         return ncNameToNcInfos;