[NO ISSUE][CLUS] Include NC Local Counters in Startup Message

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Include NC local counters in the NC startup tasks
  completion completion message.
- Ensure no NC is marked as active without its local
  counters reported.
- Remove the need for individual NC local counters
  message.
- Clean up ITxnIdFactory and IResourceIdManager APIs.

Change-Id: Ief8b9d43783ea22810c6fdb29947a1284e32daee
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2767
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
deleted file mode 100644
index 53b13e8..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.nc.task;
-
-import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.runtime.message.ReportLocalCountersMessage;
-import org.apache.hyracks.api.control.CcId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
-
-public class ReportLocalCountersTask implements INCLifecycleTask {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
-        ReportLocalCountersMessage.send(ccId, (NodeControllerService) cs);
-    }
-
-    @Override
-    public String toString() {
-        return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }";
-    }
-}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 043e02e..961593f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -31,7 +31,6 @@
 import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
 import org.apache.asterix.app.nc.task.LocalRecoveryTask;
 import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
-import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
 import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
 import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
 import org.apache.asterix.app.nc.task.UpdateNodeStatusTask;
@@ -79,7 +78,7 @@
     @Override
     public void notifyNodeFailure(String nodeId) throws HyracksDataException {
         pendingStartupCompletionNodes.remove(nodeId);
-        clusterManager.updateNodePartitions(nodeId, false);
+        clusterManager.updateNodeState(nodeId, false, null);
         if (nodeId.equals(metadataNodeId)) {
             clusterManager.updateMetadataNode(metadataNodeId, false);
         }
@@ -112,20 +111,18 @@
     private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
         final String nodeId = msg.getNodeId();
         List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState());
-        if (!tasks.isEmpty()) {
-            RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
-            try {
-                messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
-            } catch (Exception e) {
-                throw HyracksDataException.create(e);
-            }
+        RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
+        try {
+            messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
         }
     }
 
     private void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException {
         pendingStartupCompletionNodes.remove(msg.getNodeId());
         if (msg.isSuccess()) {
-            clusterManager.updateNodePartitions(msg.getNodeId(), true);
+            clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters());
             if (msg.getNodeId().equals(metadataNodeId)) {
                 clusterManager.updateMetadataNode(metadataNodeId, true);
             }
@@ -156,7 +153,6 @@
         if (metadataNode) {
             tasks.add(new BindMetadataNodeTask());
         }
-        tasks.add(new ReportLocalCountersTask());
         return tasks;
     }
 
@@ -203,7 +199,6 @@
             tasks.add(new ExportMetadataNodeTask(true));
             tasks.add(new BindMetadataNodeTask());
         }
-        tasks.add(new ReportLocalCountersTask());
         tasks.add(new UpdateNodeStatusTask(NodeStatus.ACTIVE));
         return tasks;
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 5ac3a0c..21022ed 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.app.replication.message;
 
+import org.apache.asterix.common.utils.NcLocalCounters;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
@@ -29,14 +30,16 @@
     private final String nodeId;
     private final boolean success;
     private Throwable exception;
+    private final NcLocalCounters localCounters;
 
-    public NCLifecycleTaskReportMessage(String nodeId, boolean success) {
+    public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters) {
         this.nodeId = nodeId;
         this.success = success;
+        this.localCounters = localCounters;
     }
 
     @Override
-    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+    public void handle(ICcApplicationContext appCtx) throws HyracksDataException {
         appCtx.getNcLifecycleCoordinator().process(this);
     }
 
@@ -56,6 +59,10 @@
         this.exception = exception;
     }
 
+    public NcLocalCounters getLocalCounters() {
+        return localCounters;
+    }
+
     @Override
     public MessageType getType() {
         return MessageType.REGISTRATION_TASKS_RESULT;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index 868c2ad..2d2fe97 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -20,6 +20,7 @@
 
 import java.util.List;
 
+import org.apache.asterix.common.utils.NcLocalCounters;
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.messaging.CcIdentifiedMessage;
@@ -28,6 +29,7 @@
 import org.apache.asterix.common.replication.INCLifecycleMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.util.ExitUtil;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
@@ -68,7 +70,9 @@
                 success = false;
                 exception = e;
             }
-            NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success);
+            NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
+                    (NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
+            NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success, localCounter);
             result.setException(exception);
             try {
                 broker.sendMessageToCC(getCcId(), result);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index 4beb44a..9d367ac 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -31,14 +31,19 @@
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.metadata.IMetadataBootstrap;
+import org.apache.asterix.common.utils.NcLocalCounters;
 import org.apache.asterix.runtime.transaction.ResourceIdManager;
+import org.apache.asterix.runtime.utils.BulkTxnIdFactory;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobIdFactory;
+import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.application.CCServiceContext;
 import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig;
 import org.apache.hyracks.control.common.config.ConfigManager;
@@ -194,8 +199,7 @@
 
     private void notifyNodeStartupCompletion(CcApplicationContext applicationContext, String nodeId)
             throws HyracksDataException {
-        NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true);
-        applicationContext.getResourceIdManager().report(nodeId, 0);
+        NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters());
         applicationContext.getNcLifecycleCoordinator().process(msg);
     }
 
@@ -204,7 +208,12 @@
         ConfigManager configManager = new ConfigManager(null);
         IApplicationConfig applicationConfig = new ConfigManagerApplicationConfig(configManager);
         ICCServiceContext iccServiceContext = Mockito.mock(CCServiceContext.class);
+        final ClusterControllerService ccs = Mockito.mock(ClusterControllerService.class);
+        JobIdFactory jobIdFactory = new JobIdFactory(CcId.valueOf(0));
+        Mockito.when(ccs.getJobIdFactory()).thenReturn(jobIdFactory);
         Mockito.when(iccServiceContext.getAppConfig()).thenReturn(applicationConfig);
+        Mockito.when(iccServiceContext.getControllerService()).thenReturn(ccs);
+
         Mockito.when(ccApplicationContext.getServiceContext()).thenReturn(iccServiceContext);
 
         NcLifecycleCoordinator coordinator =
@@ -225,6 +234,9 @@
         IGlobalRecoveryManager globalRecoveryManager = Mockito.mock(IGlobalRecoveryManager.class);
         Mockito.when(globalRecoveryManager.isRecoveryCompleted()).thenReturn(true);
         Mockito.when(ccApplicationContext.getGlobalRecoveryManager()).thenReturn(globalRecoveryManager);
+
+        BulkTxnIdFactory bulkTxnIdFactory = new BulkTxnIdFactory();
+        Mockito.when(ccApplicationContext.getTxnIdFactory()).thenReturn(bulkTxnIdFactory);
         return ccApplicationContext;
     }
 
@@ -238,4 +250,12 @@
         Mockito.when(metadataProperties.getNodePartitions()).thenReturn(nodePartitionsMap);
         return metadataProperties;
     }
+
+    private NcLocalCounters mockLocalCounters() {
+        final NcLocalCounters localCounters = Mockito.mock(NcLocalCounters.class);
+        Mockito.when(localCounters.getMaxJobId()).thenReturn(1000L);
+        Mockito.when(localCounters.getMaxResourceId()).thenReturn(1000L);
+        Mockito.when(localCounters.getMaxTxnId()).thenReturn(1000L);
+        return localCounters;
+    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-common/pom.xml b/asterixdb/asterix-common/pom.xml
index ee7fe5c..72c4099 100644
--- a/asterixdb/asterix-common/pom.xml
+++ b/asterixdb/asterix-common/pom.xml
@@ -287,5 +287,9 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-control-nc</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index dda9ffd..5e99eec 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -25,6 +25,7 @@
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.utils.NcLocalCounters;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.config.IOption;
@@ -59,9 +60,10 @@
      *
      * @param nodeId
      * @param active
+     * @param ncLocalCounters
      * @throws HyracksDataException
      */
-    void updateNodePartitions(String nodeId, boolean active) throws HyracksDataException;
+    void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters) throws HyracksDataException;
 
     /**
      * Updates the active node and active state of the cluster partition with id {@code partitionNum}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
index ce49ccf6..d36d383 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
@@ -18,14 +18,12 @@
  */
 package org.apache.asterix.common.transactions;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
 public interface IResourceIdManager {
 
     long createResourceId();
 
     boolean reported(String nodeId);
 
-    void report(String nodeId, long maxResourceId) throws HyracksDataException;
+    void report(String nodeId, long maxResourceId);
 
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
index be4a1f8..5c28f3f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
@@ -36,7 +36,7 @@
      * @param id
      *            the value to ensure future created transaction ids are larger than
      */
-    void ensureMinimumId(long id) throws AlgebricksException;
+    void ensureMinimumId(long id);
 
     /**
      * The highest transaction id this factory has created
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
new file mode 100644
index 0000000..7cd61d8
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.utils;
+
+import java.io.Serializable;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class NcLocalCounters implements Serializable {
+
+    private final long maxResourceId;
+    private final long maxTxnId;
+    private final long maxJobId;
+
+    private NcLocalCounters(long maxResourceId, long maxTxnId, long maxJobId) {
+        this.maxResourceId = maxResourceId;
+        this.maxTxnId = maxTxnId;
+        this.maxJobId = maxJobId;
+    }
+
+    public static NcLocalCounters collect(CcId ccId, NodeControllerService ncs) throws HyracksDataException {
+        final INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext();
+        long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
+                MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
+        long maxTxnId = appContext.getMaxTxnId();
+        long maxJobId = ncs.getMaxJobId(ccId);
+        return new NcLocalCounters(maxResourceId, maxTxnId, maxJobId);
+    }
+
+    public long getMaxResourceId() {
+        return maxResourceId;
+    }
+
+    public long getMaxTxnId() {
+        return maxTxnId;
+    }
+
+    public long getMaxJobId() {
+        return maxJobId;
+    }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
index 6b3b6a0f..f530afe 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
@@ -62,7 +62,7 @@
     }
 
     @Override
-    public void ensureMinimumId(long id) throws AlgebricksException {
+    public void ensureMinimumId(long id) {
         throw new UnsupportedOperationException();
     }
 
diff --git a/asterixdb/asterix-runtime/pom.xml b/asterixdb/asterix-runtime/pom.xml
index 22d1bfc..4cb4aad 100644
--- a/asterixdb/asterix-runtime/pom.xml
+++ b/asterixdb/asterix-runtime/pom.xml
@@ -105,10 +105,6 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-control-nc</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
       <artifactId>algebricks-common</artifactId>
     </dependency>
     <dependency>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
deleted file mode 100644
index fe9a5b8..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.message;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
-import org.apache.asterix.common.transactions.IResourceIdManager;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.control.CcId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class ReportLocalCountersMessage implements ICcAddressedMessage {
-    private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = LogManager.getLogger();
-    private final long maxResourceId;
-    private final long maxTxnId;
-    private final long maxJobId;
-    private final String src;
-
-    public ReportLocalCountersMessage(String src, long maxResourceId, long maxTxnId, long maxJobId) {
-        this.src = src;
-        this.maxResourceId = maxResourceId;
-        this.maxTxnId = maxTxnId;
-        this.maxJobId = maxJobId;
-    }
-
-    @Override
-    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
-        IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
-        try {
-            appCtx.getTxnIdFactory().ensureMinimumId(maxTxnId);
-        } catch (AlgebricksException e) {
-            throw HyracksDataException.create(e);
-        }
-        resourceIdManager.report(src, maxResourceId);
-        ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobIdFactory()
-                .setMaxJobId(maxJobId);
-    }
-
-    public static void send(CcId ccId, NodeControllerService ncs) throws HyracksDataException {
-        INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext();
-        long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
-                MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
-        long maxTxnId = appContext.getMaxTxnId();
-        long maxJobId = ncs.getMaxJobId(ccId);
-        ReportLocalCountersMessage countersMessage =
-                new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId, maxJobId);
-        try {
-            ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(ccId, countersMessage);
-        } catch (Exception e) {
-            LOGGER.log(Level.ERROR, "Unable to report local counters", e);
-            throw HyracksDataException.create(e);
-        }
-    }
-
-    @Override
-    public String toString() {
-        return ReportLocalCountersMessage.class.getSimpleName();
-    }
-}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
deleted file mode 100644
index 51f53e7..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.message;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.messaging.CcIdentifiedMessage;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.control.nc.NodeControllerService;
-
-public class ReportLocalCountersRequestMessage extends CcIdentifiedMessage implements INcAddressedMessage {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
-        ReportLocalCountersMessage.send(getCcId(),
-                (NodeControllerService) appCtx.getServiceContext().getControllerService());
-    }
-
-    @Override
-    public String toString() {
-        return ReportLocalCountersRequestMessage.class.getSimpleName();
-    }
-}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 087913f..c767c52 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -49,7 +49,6 @@
                             new Exception("Cannot generate global resource id when cluster is not active."));
                 } else {
                     response.setException(new Exception("One or more nodes has not reported max resource id."));
-                    requestMaxResourceID(clusterStateManager, resourceIdManager, broker);
                 }
             }
             broker.sendApplicationMessageToNC(response, src);
@@ -58,17 +57,6 @@
         }
     }
 
-    private void requestMaxResourceID(IClusterStateManager clusterStateManager, IResourceIdManager resourceIdManager,
-            ICCMessageBroker broker) throws Exception {
-        Set<String> getParticipantNodes = clusterStateManager.getParticipantNodes(true);
-        ReportLocalCountersRequestMessage msg = new ReportLocalCountersRequestMessage();
-        for (String nodeId : getParticipantNodes) {
-            if (!resourceIdManager.reported(nodeId)) {
-                broker.sendApplicationMessageToNC(msg, nodeId);
-            }
-        }
-    }
-
     @Override
     public String toString() {
         return ResourceIdRequestMessage.class.getSimpleName();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
index 6d3077e..5bcd5aa 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
@@ -24,7 +24,6 @@
 
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.transactions.IResourceIdManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ResourceIdManager implements IResourceIdManager {
 
@@ -48,11 +47,8 @@
     }
 
     @Override
-    public void report(String nodeId, long maxResourceId) throws HyracksDataException {
+    public void report(String nodeId, long maxResourceId) {
         globalResourceId.updateAndGet(prev -> Math.max(maxResourceId, prev));
-        if (reportedNodes.add(nodeId)) {
-            csm.refreshState();
-        }
+        reportedNodes.add(nodeId);
     }
-
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
index 542bc17..296ce9c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
@@ -24,7 +24,7 @@
 import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.common.transactions.TxnId;
 
-class BulkTxnIdFactory implements ITxnIdFactory {
+public class BulkTxnIdFactory implements ITxnIdFactory {
 
     private final AtomicLong maxId = new AtomicLong();
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 8539fa4..8d3187b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -37,12 +37,14 @@
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
 import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.asterix.common.utils.NcLocalCounters;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.config.Section;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig;
 import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -133,8 +135,9 @@
     }
 
     @Override
-    public synchronized void updateNodePartitions(String nodeId, boolean active) {
+    public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters) {
         if (active) {
+            updateClusterCounters(nodeId, localCounters);
             participantNodes.add(nodeId);
         } else {
             participantNodes.remove(nodeId);
@@ -182,15 +185,6 @@
             setState(ClusterState.UNUSABLE);
             return;
         }
-
-        IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
-        for (String node : participantNodes) {
-            if (!resourceIdManager.reported(node)) {
-                LOGGER.info("Partitions are ready but {} has not yet registered its max resource id...", node);
-                setState(ClusterState.UNUSABLE);
-                return;
-            }
-        }
         // the metadata bootstrap & global recovery must be complete before the cluster can be active
         if (metadataNodeActive) {
             if (state != ClusterState.ACTIVE && state != ClusterState.RECOVERING) {
@@ -452,6 +446,14 @@
         return metadataPartition;
     }
 
+    private void updateClusterCounters(String nodeId, NcLocalCounters localCounters) {
+        final IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
+        resourceIdManager.report(nodeId, localCounters.getMaxResourceId());
+        appCtx.getTxnIdFactory().ensureMinimumId(localCounters.getMaxTxnId());
+        ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobIdFactory()
+                .setMaxJobId(localCounters.getMaxJobId());
+    }
+
     private void updateNodeConfig(String nodeId, Map<IOption, Object> configuration) {
         ConfigManager configManager =
                 ((ConfigManagerApplicationConfig) appCtx.getServiceContext().getAppConfig()).getConfigManager();