[NO ISSUE][CLUS] Unify Code Path of Requesting Startup Tasks
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Remove special case of requesting startup tasks on
NC startup complete.
- Handle case where CC might fail before sending startup
tasks to NC.
Change-Id: Ieb89d6f293b0e958c3f141afc6f1db372cee7c91
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2401
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
new file mode 100644
index 0000000..a1e11c2
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class UpdateNodeStatusTask implements INCLifecycleTask {
+
+ private static final long serialVersionUID = 1L;
+ private final NodeStatus status;
+
+ public UpdateNodeStatusTask(NodeStatus status) {
+ this.status = status;
+ }
+
+ @Override
+ public void perform(CcId ccId, IControllerService cs) {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ ncs.setNodeStatus(status);
+ }
+
+ @Override
+ public String toString() {
+ return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }";
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 25e768d..844851a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -34,6 +34,7 @@
import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
+import org.apache.asterix.app.nc.task.UpdateNodeStatusTask;
import org.apache.asterix.app.replication.message.MetadataNodeRequestMessage;
import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage;
import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
@@ -145,6 +146,7 @@
return buildActiveNCRegTasks(isMetadataNode);
}
final List<INCLifecycleTask> tasks = new ArrayList<>();
+ tasks.add(new UpdateNodeStatusTask(NodeStatus.ACTIVE));
if (state == SystemState.CORRUPTED) {
//need to perform local recovery for node partitions
LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId))
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index b0382f7..3a8a92f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -73,7 +73,6 @@
private INcApplicationContext runtimeContext;
private String nodeId;
private boolean stopInitiated;
- private boolean startupCompleted;
protected WebManager webManager;
@Override
@@ -190,33 +189,16 @@
// configure servlets after joining the cluster, so we can create HyracksClientConnection
configureServers();
webManager.start();
-
- // Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag
- final NodeProperties nodeProperties = runtimeContext.getNodeProperties();
- IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
- SystemState state = recoveryMgr.getSystemState();
- if (state == SystemState.PERMANENT_DATA_LOSS
- && (nodeProperties.isInitialRun() || nodeProperties.isVirtualNc())) {
- state = SystemState.BOOTSTRAPPING;
- }
- // Request registration tasks from CC (we only do this from our primary CC, in the case of multiple CCs)
- final NodeControllerService ncControllerService = (NodeControllerService) ncServiceCtx.getControllerService();
- RegistrationTasksRequestMessage.send(ncControllerService.getPrimaryCcId(), ncControllerService,
- NodeStatus.BOOTING, state);
- startupCompleted = true;
}
@Override
- public void onRegisterNode(CcId ccId) throws Exception {
- if (startupCompleted) {
- /*
- * If the node completed its startup before, then this is a re-registration with
- * the primary (or supplemental) CC and therefore the system state should be HEALTHY and the node status
- * is ACTIVE
- */
- RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
- NodeStatus.ACTIVE, SystemState.HEALTHY);
- }
+ public synchronized void onRegisterNode(CcId ccId) throws Exception {
+ final NodeControllerService ncs = (NodeControllerService) ncServiceCtx.getControllerService();
+ final NodeStatus currentStatus = ncs.getNodeStatus();
+ final SystemState systemState = isPendingStartupTasks(currentStatus, ncs.getPrimaryCcId(), ccId)
+ ? getCurrentSystemState() : SystemState.HEALTHY;
+ RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
+ currentStatus, systemState);
}
@Override
@@ -259,4 +241,20 @@
return devices.get(ioDeviceIndex);
};
}
+
+ private boolean isPendingStartupTasks(NodeStatus nodeStatus, CcId primaryCc, CcId registeredCc) {
+ return nodeStatus == NodeStatus.BOOTING && (primaryCc == null || primaryCc.equals(registeredCc));
+ }
+
+ private SystemState getCurrentSystemState() {
+ final NodeProperties nodeProperties = runtimeContext.getNodeProperties();
+ IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
+ SystemState state = recoveryMgr.getSystemState();
+ // Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag
+ if (state == SystemState.PERMANENT_DATA_LOSS
+ && (nodeProperties.isInitialRun() || nodeProperties.isVirtualNc())) {
+ state = SystemState.BOOTSTRAPPING;
+ }
+ return state;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0f40b60..0ccef1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -48,6 +48,7 @@
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.api.application.INCApplication;
import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
@@ -62,6 +63,7 @@
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.control.common.base.IClusterController;
import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.control.common.context.ServerContext;
@@ -95,7 +97,6 @@
import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
import org.apache.hyracks.util.ExitUtil;
-import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.util.PidHelper;
import org.apache.hyracks.util.trace.ITracer;
import org.apache.hyracks.util.trace.Tracer;
@@ -183,6 +184,7 @@
private final ConfigManager configManager;
private final Map<CcId, AtomicLong> maxJobIds = new ConcurrentHashMap<>();
+ private NodeStatus status = NodeStatus.BOOTING;
static {
ExitUtil.init();
@@ -627,6 +629,14 @@
return workQueue;
}
+ public synchronized NodeStatus getNodeStatus() {
+ return status;
+ }
+
+ public synchronized void setNodeStatus(NodeStatus status) {
+ this.status = status;
+ }
+
private class HeartbeatTask implements Runnable {
private final Semaphore delayBlock = new Semaphore(0);
private final IClusterController cc;