[NO ISSUE][TX] Ensure TxnIdFactory Value is Initialized
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Report local max txn id after node registration.
- Add node status BOOTING.
- Distinguish between node first time registration and
registration after restarting by using NodeStatus
BOOTING to respond with the proper node post
registration tasks.
- Rename node status ALIVE -> ACTIVE.
- Rename StartupTask* to RegistrationTasks*
Change-Id: I6899c9e7d6e744ca92d0108556e086a23639d78b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2151
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 7b08f68..e77d535 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -481,7 +481,9 @@
@Override
public synchronized void unexportMetadataNodeStub() throws RemoteException {
- UnicastRemoteObject.unexportObject(MetadataNode.INSTANCE, false);
+ if (metadataNodeStub != null) {
+ UnicastRemoteObject.unexportObject(MetadataNode.INSTANCE, false);
+ }
metadataNodeStub = null;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
similarity index 87%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
index 22d3cde..86f7d1c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
@@ -19,18 +19,18 @@
package org.apache.asterix.app.nc.task;
import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage;
+import org.apache.asterix.runtime.message.ReportLocalCountersMessage;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
-public class ReportMaxResourceIdTask implements INCLifecycleTask {
+public class ReportLocalCountersTask implements INCLifecycleTask {
private static final long serialVersionUID = 1L;
@Override
public void perform(IControllerService cs) throws HyracksDataException {
- ReportMaxResourceIdMessage.send((NodeControllerService) cs);
+ ReportLocalCountersMessage.send((NodeControllerService) cs);
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
index 4ac1305..23f225e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
@@ -33,7 +33,7 @@
import org.apache.asterix.app.nc.task.CheckpointTask;
import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
-import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask;
+import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
import org.apache.asterix.app.nc.task.StartFailbackTask;
import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
@@ -43,8 +43,8 @@
import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
import org.apache.asterix.app.replication.message.PreparePartitionsFailbackRequestMessage;
import org.apache.asterix.app.replication.message.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
-import org.apache.asterix.app.replication.message.StartupTaskResponseMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
import org.apache.asterix.app.replication.message.TakeoverMetadataNodeRequestMessage;
import org.apache.asterix.app.replication.message.TakeoverMetadataNodeResponseMessage;
import org.apache.asterix.app.replication.message.TakeoverPartitionsRequestMessage;
@@ -431,10 +431,10 @@
@Override
public synchronized void process(INCLifecycleMessage message) throws HyracksDataException {
switch (message.getType()) {
- case STARTUP_TASK_REQUEST:
- process((StartupTaskRequestMessage) message);
+ case REGISTRATION_TASKS_REQUEST:
+ process((RegistrationTasksRequestMessage) message);
break;
- case STARTUP_TASK_RESULT:
+ case REGISTRATION_TASKS_RESULT:
process((NCLifecycleTaskReportMessage) message);
break;
case TAKEOVER_PARTITION_RESPONSE:
@@ -483,7 +483,7 @@
currentMetadataNode = clusterManager.getCurrentMetadataNodeId();
}
- private synchronized void process(StartupTaskRequestMessage msg) throws HyracksDataException {
+ private synchronized void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
final String nodeId = msg.getNodeId();
final SystemState state = msg.getState();
List<INCLifecycleTask> tasks;
@@ -493,7 +493,7 @@
// failed node returned. Need to start failback process
tasks = buildFailbackStartupSequence();
}
- StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks);
+ RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
try {
messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
} catch (Exception e) {
@@ -504,7 +504,7 @@
private List<INCLifecycleTask> buildFailbackStartupSequence() {
final List<INCLifecycleTask> tasks = new ArrayList<>();
tasks.add(new StartFailbackTask());
- tasks.add(new ReportMaxResourceIdTask());
+ tasks.add(new ReportLocalCountersTask());
tasks.add(new StartLifecycleComponentsTask());
return tasks;
}
@@ -517,7 +517,7 @@
tasks.add(new MetadataBootstrapTask());
}
tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
- tasks.add(new ReportMaxResourceIdTask());
+ tasks.add(new ReportLocalCountersTask());
tasks.add(new CheckpointTask());
tasks.add(new StartLifecycleComponentsTask());
if (isMetadataNode) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
index 1b57403..3341813 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
@@ -35,14 +35,14 @@
import org.apache.asterix.app.nc.task.LocalRecoveryTask;
import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
import org.apache.asterix.app.nc.task.RemoteRecoveryTask;
-import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask;
+import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
import org.apache.asterix.app.replication.message.ReplayPartitionLogsRequestMessage;
import org.apache.asterix.app.replication.message.ReplayPartitionLogsResponseMessage;
-import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
-import org.apache.asterix.app.replication.message.StartupTaskResponseMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IClusterStateManager;
@@ -123,10 +123,10 @@
@Override
public synchronized void process(INCLifecycleMessage message) throws HyracksDataException {
switch (message.getType()) {
- case STARTUP_TASK_REQUEST:
- process((StartupTaskRequestMessage) message);
+ case REGISTRATION_TASKS_REQUEST:
+ process((RegistrationTasksRequestMessage) message);
break;
- case STARTUP_TASK_RESULT:
+ case REGISTRATION_TASKS_RESULT:
process((NCLifecycleTaskReportMessage) message);
break;
case REPLAY_LOGS_RESPONSE:
@@ -150,7 +150,7 @@
}
}
- private synchronized void process(StartupTaskRequestMessage msg) throws HyracksDataException {
+ private synchronized void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
final String nodeId = msg.getNodeId();
final SystemState state = msg.getState();
final boolean isParticipant = replicationStrategy.isParticipant(nodeId);
@@ -160,7 +160,7 @@
} else {
tasks = buildParticipantStartupSequence(nodeId, state);
}
- StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks);
+ RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
try {
messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
} catch (Exception e) {
@@ -199,7 +199,7 @@
tasks.add(rt);
}
tasks.add(new ExternalLibrarySetupTask(false));
- tasks.add(new ReportMaxResourceIdTask());
+ tasks.add(new ReportLocalCountersTask());
tasks.add(new CheckpointTask());
tasks.add(new StartLifecycleComponentsTask());
return tasks;
@@ -234,7 +234,7 @@
tasks.add(new MetadataBootstrapTask());
}
tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
- tasks.add(new ReportMaxResourceIdTask());
+ tasks.add(new ReportLocalCountersTask());
tasks.add(new CheckpointTask());
tasks.add(new StartLifecycleComponentsTask());
if (isMetadataNode) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
index b9ea135..a273845 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
@@ -32,11 +32,11 @@
import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
import org.apache.asterix.app.nc.task.LocalRecoveryTask;
import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
-import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask;
+import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
-import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
-import org.apache.asterix.app.replication.message.StartupTaskResponseMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IClusterStateManager;
@@ -48,12 +48,13 @@
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
private static final Logger LOGGER = Logger.getLogger(NoFaultToleranceStrategy.class.getName());
- IClusterStateManager clusterManager;
+ private IClusterStateManager clusterManager;
private String metadataNodeId;
private Set<String> pendingStartupCompletionNodes = new HashSet<>();
private ICCMessageBroker messageBroker;
@@ -76,10 +77,10 @@
@Override
public void process(INCLifecycleMessage message) throws HyracksDataException {
switch (message.getType()) {
- case STARTUP_TASK_REQUEST:
- process((StartupTaskRequestMessage) message);
+ case REGISTRATION_TASKS_REQUEST:
+ process((RegistrationTasksRequestMessage) message);
break;
- case STARTUP_TASK_RESULT:
+ case REGISTRATION_TASKS_RESULT:
process((NCLifecycleTaskReportMessage) message);
break;
default:
@@ -100,10 +101,10 @@
metadataNodeId = clusterManager.getCurrentMetadataNodeId();
}
- private void process(StartupTaskRequestMessage msg) throws HyracksDataException {
+ private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
final String nodeId = msg.getNodeId();
- List<INCLifecycleTask> tasks = buildNCStartupSequence(msg.getNodeId(), msg.getState());
- StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks);
+ List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState());
+ RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
try {
messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
} catch (Exception e) {
@@ -126,7 +127,16 @@
}
}
- private List<INCLifecycleTask> buildNCStartupSequence(String nodeId, SystemState state) {
+ private List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) {
+ LOGGER.log(Level.INFO, () -> "Building registration tasks for node: " + nodeId + " with state: " + state);
+ final boolean isMetadataNode = nodeId.equals(metadataNodeId);
+ if (nodeStatus == NodeStatus.ACTIVE) {
+ /*
+ * if the node state is already ACTIVE then it completed
+ * booting and just re-registering with a new/failed CC.
+ */
+ return buildActiveNCRegTasks(isMetadataNode);
+ }
final List<INCLifecycleTask> tasks = new ArrayList<>();
if (state == SystemState.CORRUPTED) {
//need to perform local recovery for node partitions
@@ -134,12 +144,11 @@
.stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
tasks.add(rt);
}
- final boolean isMetadataNode = nodeId.equals(metadataNodeId);
if (isMetadataNode) {
tasks.add(new MetadataBootstrapTask());
}
tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
- tasks.add(new ReportMaxResourceIdTask());
+ tasks.add(new ReportLocalCountersTask());
tasks.add(new CheckpointTask());
tasks.add(new StartLifecycleComponentsTask());
if (isMetadataNode) {
@@ -147,4 +156,15 @@
}
return tasks;
}
+
+ private List<INCLifecycleTask> buildActiveNCRegTasks(boolean metadataNode) {
+ final List<INCLifecycleTask> tasks = new ArrayList<>();
+ if (metadataNode) {
+ // need to unbind from old distributed state then rebind to new one
+ tasks.add(new BindMetadataNodeTask(false));
+ tasks.add(new BindMetadataNodeTask(true));
+ }
+ tasks.add(new ReportLocalCountersTask());
+ return tasks;
+ }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 2b32e1f..b654fd8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -58,6 +58,6 @@
@Override
public MessageType getType() {
- return MessageType.STARTUP_TASK_RESULT;
+ return MessageType.REGISTRATION_TASKS_RESULT;
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
similarity index 69%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index 21dee9c..075c415 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -26,27 +26,32 @@
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.INCLifecycleMessage;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.control.nc.NodeControllerService;
-public class StartupTaskRequestMessage implements INCLifecycleMessage, ICcAddressedMessage {
+public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICcAddressedMessage {
- private static final Logger LOGGER = Logger.getLogger(StartupTaskRequestMessage.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(RegistrationTasksRequestMessage.class.getName());
private static final long serialVersionUID = 1L;
private final SystemState state;
private final String nodeId;
+ private final NodeStatus nodeStatus;
- public StartupTaskRequestMessage(String nodeId, SystemState state) {
+ public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state) {
this.state = state;
this.nodeId = nodeId;
+ this.nodeStatus = nodeStatus;
}
- public static void send(NodeControllerService cs, SystemState systemState) throws HyracksDataException {
+ public static void send(NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState)
+ throws HyracksDataException {
try {
- StartupTaskRequestMessage msg = new StartupTaskRequestMessage(cs.getId(), systemState);
+ RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
+ systemState);
((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(msg);
} catch (Exception e) {
- LOGGER.log(Level.SEVERE, "Unable to send StartupTaskRequestMessage to CC", e);
+ LOGGER.log(Level.SEVERE, "Unable to send RegistrationTasksRequestMessage to CC", e);
throw HyracksDataException.create(e);
}
}
@@ -64,8 +69,13 @@
return nodeId;
}
+ public NodeStatus getNodeStatus() {
+ return nodeStatus;
+ }
+
@Override
public MessageType getType() {
- return MessageType.STARTUP_TASK_REQUEST;
+ return MessageType.REGISTRATION_TASKS_REQUEST;
}
+
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
similarity index 90%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index b941343..13525e3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -32,14 +32,14 @@
import org.apache.hyracks.control.nc.NCShutdownHook;
import org.apache.hyracks.util.ExitUtil;
-public class StartupTaskResponseMessage implements INCLifecycleMessage, INcAddressedMessage {
+public class RegistrationTasksResponseMessage implements INCLifecycleMessage, INcAddressedMessage {
- private static final Logger LOGGER = Logger.getLogger(StartupTaskResponseMessage.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(RegistrationTasksResponseMessage.class.getName());
private static final long serialVersionUID = 1L;
private final String nodeId;
private final List<INCLifecycleTask> tasks;
- public StartupTaskResponseMessage(String nodeId, List<INCLifecycleTask> tasks) {
+ public RegistrationTasksResponseMessage(String nodeId, List<INCLifecycleTask> tasks) {
this.nodeId = nodeId;
this.tasks = tasks;
}
@@ -88,6 +88,6 @@
@Override
public MessageType getType() {
- return MessageType.STARTUP_TASK_RESPONSE;
+ return MessageType.REGISTRATION_TASKS_RESPONSE;
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 3d7f870..a18535d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -26,7 +26,7 @@
import java.util.logging.Logger;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
-import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
+import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.config.AsterixExtension;
@@ -48,6 +48,7 @@
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IFileDeviceResolver;
@@ -67,7 +68,6 @@
private String nodeId;
private boolean stopInitiated;
private boolean startupCompleted;
- private SystemState systemState;
protected WebManager webManager;
@Override
@@ -117,9 +117,8 @@
this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory);
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
- systemState = recoveryMgr.getSystemState();
-
- if (systemState == SystemState.PERMANENT_DATA_LOSS) {
+ final SystemState stateOnStartup = recoveryMgr.getSystemState();
+ if (stateOnStartup == SystemState.PERMANENT_DATA_LOSS) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("System state: " + SystemState.PERMANENT_DATA_LOSS);
LOGGER.info("Node ID: " + nodeId);
@@ -187,20 +186,27 @@
// Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag
final NodeProperties nodeProperties = runtimeContext.getNodeProperties();
- if (systemState == SystemState.PERMANENT_DATA_LOSS
- && (nodeProperties.isInitialRun() || nodeProperties.isVirtualNc())) {
- systemState = SystemState.BOOTSTRAPPING;
+ IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
+ SystemState state = recoveryMgr.getSystemState();
+ if (state == SystemState.PERMANENT_DATA_LOSS && (nodeProperties.isInitialRun() || nodeProperties
+ .isVirtualNc())) {
+ state = SystemState.BOOTSTRAPPING;
}
- // Request startup tasks from CC
- StartupTaskRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), systemState);
+ // Request registration tasks from CC
+ RegistrationTasksRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(),
+ NodeStatus.BOOTING, state);
startupCompleted = true;
}
@Override
public void onRegisterNode() throws Exception {
if (startupCompleted) {
- // Request startup tasks from CC
- StartupTaskRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), systemState);
+ /*
+ * If the node completed its startup before, then this is a re-registration with
+ * the CC and therefore the system state should be HEALTHY and the node status is ACTIVE
+ */
+ RegistrationTasksRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(),
+ NodeStatus.ACTIVE, SystemState.HEALTHY);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
index 87b0856..cb9fa8f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
@@ -22,16 +22,16 @@
public interface INCLifecycleMessage extends IMessage {
- public enum MessageType {
+ enum MessageType {
REPLAY_LOGS_REQUEST,
REPLAY_LOGS_RESPONSE,
PREPARE_FAILBACK_REQUEST,
PREPARE_FAILBACK_RESPONSE,
COMPLETE_FAILBACK_REQUEST,
COMPLETE_FAILBACK_RESPONSE,
- STARTUP_TASK_REQUEST,
- STARTUP_TASK_RESPONSE,
- STARTUP_TASK_RESULT,
+ REGISTRATION_TASKS_REQUEST,
+ REGISTRATION_TASKS_RESPONSE,
+ REGISTRATION_TASKS_RESULT,
TAKEOVER_PARTITION_REQUEST,
TAKEOVER_PARTITION_RESPONSE,
TAKEOVER_METADATA_NODE_REQUEST,
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
similarity index 76%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
index 277c0ba..3f8ced8 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
@@ -27,27 +27,27 @@
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.control.nc.NodeControllerService;
-public class ReportMaxResourceIdMessage implements ICcAddressedMessage {
+public class ReportLocalCountersMessage implements ICcAddressedMessage {
private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = Logger.getLogger(ReportMaxResourceIdMessage.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(ReportLocalCountersMessage.class.getName());
private final long maxResourceId;
+ private final long maxTxnId;
private final String src;
- public ReportMaxResourceIdMessage(String src, long maxResourceId) {
+ public ReportLocalCountersMessage(String src, long maxResourceId, long maxTxnId) {
this.src = src;
this.maxResourceId = maxResourceId;
- }
-
- public long getMaxResourceId() {
- return maxResourceId;
+ this.maxTxnId = maxTxnId;
}
@Override
public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
+ TxnIdFactory.ensureMinimumId(maxTxnId);
resourceIdManager.report(src, maxResourceId);
}
@@ -56,17 +56,19 @@
INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext();
long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
- ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage(ncs.getId(), maxResourceId);
+ long maxTxnId = appContext.getTransactionSubsystem().getTransactionManager().getMaxTxnId();
+ ReportLocalCountersMessage countersMessage =
+ new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId);
try {
- ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg);
+ ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(countersMessage);
} catch (Exception e) {
- LOGGER.log(Level.SEVERE, "Unable to report max local resource id", e);
+ LOGGER.log(Level.SEVERE, "Unable to report local counters", e);
throw HyracksDataException.create(e);
}
}
@Override
public String toString() {
- return ReportMaxResourceIdMessage.class.getSimpleName();
+ return ReportLocalCountersMessage.class.getSimpleName();
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
similarity index 87%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
index a43376d..785ad2f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
@@ -23,16 +23,16 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.control.nc.NodeControllerService;
-public class ReportMaxResourceIdRequestMessage implements INcAddressedMessage {
+public class ReportLocalCountersRequestMessage implements INcAddressedMessage {
private static final long serialVersionUID = 1L;
@Override
public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- ReportMaxResourceIdMessage.send((NodeControllerService) appCtx.getServiceContext().getControllerService());
+ ReportLocalCountersMessage.send((NodeControllerService) appCtx.getServiceContext().getControllerService());
}
@Override
public String toString() {
- return ReportMaxResourceIdRequestMessage.class.getSimpleName();
+ return ReportLocalCountersRequestMessage.class.getSimpleName();
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index decc1a9..a2f4aa1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -61,7 +61,7 @@
private void requestMaxResourceID(IClusterStateManager clusterStateManager, IResourceIdManager resourceIdManager,
ICCMessageBroker broker) throws Exception {
Set<String> getParticipantNodes = clusterStateManager.getParticipantNodes();
- ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage();
+ ReportLocalCountersRequestMessage msg = new ReportLocalCountersRequestMessage();
for (String nodeId : getParticipantNodes) {
if (!resourceIdManager.reported(nodeId)) {
broker.sendApplicationMessageToNC(msg, nodeId);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java
index 71d7f56..eb59e74 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java
@@ -37,6 +37,6 @@
}
public static void ensureMinimumId(long id) {
- TxnIdFactory.id.set(id);
+ TxnIdFactory.id.updateAndGet(current -> Math.max(current, id));
}
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java
index b84f1f2..10a9a3c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.api.client;
public enum NodeStatus {
- ALIVE,
+ ACTIVE,
+ BOOTING,
DEAD
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 3cd6235..4928564 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -136,7 +136,7 @@
public Map<String, NodeControllerInfo> getNodeControllerInfoMap() {
Map<String, NodeControllerInfo> result = new LinkedHashMap<>();
nodeRegistry.forEach(
- (key, ncState) -> result.put(key, new NodeControllerInfo(key, NodeStatus.ALIVE, ncState.getDataPort(),
+ (key, ncState) -> result.put(key, new NodeControllerInfo(key, NodeStatus.ACTIVE, ncState.getDataPort(),
ncState.getDatasetPort(), ncState.getMessagingPort(), ncState.getCapacity().getCores())));
return result;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
index 445a15c..bb28c79 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -97,10 +97,10 @@
Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(4, "nc", "10.0.0.",
dataPort, resultPort, messagingPort);
ncNameToNcInfos.put("nc7",
- new NodeControllerInfo("nc7", NodeStatus.ALIVE, new NetworkAddress("10.0.0.7", dataPort),
+ new NodeControllerInfo("nc7", NodeStatus.ACTIVE, new NetworkAddress("10.0.0.7", dataPort),
new NetworkAddress("10.0.0.5", resultPort), new NetworkAddress("10.0.0.5", messagingPort), 2));
ncNameToNcInfos.put("nc12",
- new NodeControllerInfo("nc12", NodeStatus.ALIVE, new NetworkAddress("10.0.0.12", dataPort),
+ new NodeControllerInfo("nc12", NodeStatus.ACTIVE, new NetworkAddress("10.0.0.12", dataPort),
new NetworkAddress("10.0.0.5", resultPort), new NetworkAddress("10.0.0.5", messagingPort), 2));
InputSplit[] fileSplits = new InputSplit[12];
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index c3d86e8..1814e85 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -135,7 +135,7 @@
String ncId = ncNamePrefix + i;
String ncAddress = addressPrefix + i;
ncNameToNcInfos.put(ncId,
- new NodeControllerInfo(ncId, NodeStatus.ALIVE, new NetworkAddress(ncAddress, netPort),
+ new NodeControllerInfo(ncId, NodeStatus.ACTIVE, new NetworkAddress(ncAddress, netPort),
new NetworkAddress(ncAddress, dataPort), new NetworkAddress(ncAddress, messagingPort), 2));
}
return ncNameToNcInfos;