[NO ISSUE][CLUS] Unify Code Path of Requesting Startup Tasks

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Remove special case of requesting startup tasks on
  NC startup complete.
- Handle case where CC might fail before sending startup
  tasks to NC.

Change-Id: Ieb89d6f293b0e958c3f141afc6f1db372cee7c91
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2401
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
new file mode 100644
index 0000000..a1e11c2
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class UpdateNodeStatusTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+    private final NodeStatus status;
+
+    public UpdateNodeStatusTask(NodeStatus status) {
+        this.status = status;
+    }
+
+    @Override
+    public void perform(CcId ccId, IControllerService cs) {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        ncs.setNodeStatus(status);
+    }
+
+    @Override
+    public String toString() {
+        return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }";
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 25e768d..844851a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -34,6 +34,7 @@
 import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
 import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
 import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
+import org.apache.asterix.app.nc.task.UpdateNodeStatusTask;
 import org.apache.asterix.app.replication.message.MetadataNodeRequestMessage;
 import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage;
 import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
@@ -145,6 +146,7 @@
             return buildActiveNCRegTasks(isMetadataNode);
         }
         final List<INCLifecycleTask> tasks = new ArrayList<>();
+        tasks.add(new UpdateNodeStatusTask(NodeStatus.ACTIVE));
         if (state == SystemState.CORRUPTED) {
             //need to perform local recovery for node partitions
             LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId))
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index b0382f7..3a8a92f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -73,7 +73,6 @@
     private INcApplicationContext runtimeContext;
     private String nodeId;
     private boolean stopInitiated;
-    private boolean startupCompleted;
     protected WebManager webManager;
 
     @Override
@@ -190,33 +189,16 @@
         // configure servlets after joining the cluster, so we can create HyracksClientConnection
         configureServers();
         webManager.start();
-
-        // Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag
-        final NodeProperties nodeProperties = runtimeContext.getNodeProperties();
-        IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
-        SystemState state = recoveryMgr.getSystemState();
-        if (state == SystemState.PERMANENT_DATA_LOSS
-                && (nodeProperties.isInitialRun() || nodeProperties.isVirtualNc())) {
-            state = SystemState.BOOTSTRAPPING;
-        }
-        // Request registration tasks from CC (we only do this from our primary CC, in the case of multiple CCs)
-        final NodeControllerService ncControllerService = (NodeControllerService) ncServiceCtx.getControllerService();
-        RegistrationTasksRequestMessage.send(ncControllerService.getPrimaryCcId(), ncControllerService,
-                NodeStatus.BOOTING, state);
-        startupCompleted = true;
     }
 
     @Override
-    public void onRegisterNode(CcId ccId) throws Exception {
-        if (startupCompleted) {
-            /*
-             * If the node completed its startup before, then this is a re-registration with
-             * the primary (or supplemental) CC and therefore the system state should be HEALTHY and the node status
-             * is ACTIVE
-             */
-            RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
-                    NodeStatus.ACTIVE, SystemState.HEALTHY);
-        }
+    public synchronized void onRegisterNode(CcId ccId) throws Exception {
+        final NodeControllerService ncs = (NodeControllerService) ncServiceCtx.getControllerService();
+        final NodeStatus currentStatus = ncs.getNodeStatus();
+        final SystemState systemState = isPendingStartupTasks(currentStatus, ncs.getPrimaryCcId(), ccId)
+                ? getCurrentSystemState() : SystemState.HEALTHY;
+        RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
+                currentStatus, systemState);
     }
 
     @Override
@@ -259,4 +241,20 @@
             return devices.get(ioDeviceIndex);
         };
     }
+
+    private boolean isPendingStartupTasks(NodeStatus nodeStatus, CcId primaryCc, CcId registeredCc) {
+        return nodeStatus == NodeStatus.BOOTING && (primaryCc == null || primaryCc.equals(registeredCc));
+    }
+
+    private SystemState getCurrentSystemState() {
+        final NodeProperties nodeProperties = runtimeContext.getNodeProperties();
+        IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
+        SystemState state = recoveryMgr.getSystemState();
+        // Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag
+        if (state == SystemState.PERMANENT_DATA_LOSS
+                && (nodeProperties.isInitialRun() || nodeProperties.isVirtualNc())) {
+            state = SystemState.BOOTSTRAPPING;
+        }
+        return state;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0f40b60..0ccef1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -48,6 +48,7 @@
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.api.application.INCApplication;
 import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
@@ -62,6 +63,7 @@
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.context.ServerContext;
@@ -95,7 +97,6 @@
 import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
 import org.apache.hyracks.util.ExitUtil;
-import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.util.PidHelper;
 import org.apache.hyracks.util.trace.ITracer;
 import org.apache.hyracks.util.trace.Tracer;
@@ -183,6 +184,7 @@
     private final ConfigManager configManager;
 
     private final Map<CcId, AtomicLong> maxJobIds = new ConcurrentHashMap<>();
+    private NodeStatus status = NodeStatus.BOOTING;
 
     static {
         ExitUtil.init();
@@ -627,6 +629,14 @@
         return workQueue;
     }
 
+    public synchronized NodeStatus getNodeStatus() {
+        return status;
+    }
+
+    public synchronized void setNodeStatus(NodeStatus status) {
+        this.status = status;
+    }
+
     private class HeartbeatTask implements Runnable {
         private final Semaphore delayBlock = new Semaphore(0);
         private final IClusterController cc;