Merge branch 'gerrit/mad-hatter'
Change-Id: If9d5238e4c3ae94eb17b951f07259432b85ead9d
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 18191d6..0d86cb9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -51,8 +52,9 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.control.IGatekeeper;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.logging.log4j.Level;
+import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -61,13 +63,16 @@
private static final Logger LOGGER = LogManager.getLogger();
protected IClusterStateManager clusterManager;
protected volatile String metadataNodeId;
- protected Set<String> pendingStartupCompletionNodes = new HashSet<>();
+ protected Set<String> pendingStartupCompletionNodes = Collections.synchronizedSet(new HashSet<>());
protected final ICCMessageBroker messageBroker;
private final boolean replicationEnabled;
+ private final IGatekeeper gatekeeper;
public NcLifecycleCoordinator(ICCServiceContext serviceCtx, boolean replicationEnabled) {
this.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker();
this.replicationEnabled = replicationEnabled;
+ this.gatekeeper =
+ ((ClusterControllerService) serviceCtx.getControllerService()).getApplication().getGatekeeper();
}
@Override
@@ -120,7 +125,14 @@
}
private void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException {
- pendingStartupCompletionNodes.remove(msg.getNodeId());
+ if (!pendingStartupCompletionNodes.remove(msg.getNodeId())) {
+ LOGGER.warn("Received unexpected startup completion message from node {}", msg.getNodeId());
+ }
+ if (!gatekeeper.isAuthorized(msg.getNodeId())) {
+ LOGGER.warn("Node {} lost authorization before startup completed; ignoring registration result",
+ msg.getNodeId());
+ return;
+ }
if (msg.isSuccess()) {
clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters());
if (msg.getNodeId().equals(metadataNodeId)) {
@@ -128,9 +140,7 @@
}
clusterManager.refreshState();
} else {
- if (LOGGER.isErrorEnabled()) {
- LOGGER.log(Level.ERROR, msg.getNodeId() + " failed to complete startup. ", msg.getException());
- }
+ LOGGER.error("Node {} failed to complete startup", msg.getNodeId(), msg.getException());
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index b7a8c63..9cc295e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -32,6 +32,7 @@
import org.apache.asterix.common.config.MetadataProperties;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.utils.NcLocalCounters;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
import org.apache.asterix.runtime.transaction.ResourceIdManager;
import org.apache.asterix.runtime.utils.BulkTxnIdFactory;
import org.apache.asterix.runtime.utils.CcApplicationContext;
@@ -211,10 +212,13 @@
final ClusterControllerService ccs = Mockito.mock(ClusterControllerService.class);
JobIdFactory jobIdFactory = new JobIdFactory(CcId.valueOf(0));
Mockito.when(ccs.getJobIdFactory()).thenReturn(jobIdFactory);
+ final CCApplication ccApplication = Mockito.mock(CCApplication.class);
+ Mockito.when(ccs.getApplication()).thenReturn(ccApplication);
Mockito.when(iccServiceContext.getAppConfig()).thenReturn(applicationConfig);
Mockito.when(iccServiceContext.getControllerService()).thenReturn(ccs);
Mockito.when(ccApplicationContext.getServiceContext()).thenReturn(iccServiceContext);
+ Mockito.when(ccApplication.getGatekeeper()).thenReturn(nodeId -> true);
NcLifecycleCoordinator coordinator =
new NcLifecycleCoordinator(ccApplicationContext.getServiceContext(), false);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 6c39372..a37e6e4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -120,7 +120,6 @@
/**
* Register the specified node partitions with the specified nodeId with this cluster state manager
- * then calls {@link IClusterStateManager#refreshState()}
*
* @param nodeId
* @param nodePartitions
@@ -130,7 +129,6 @@
/**
* De-register the specified node's partitions from this cluster state manager
- * then calls {@link IClusterStateManager#refreshState()}
*
* @param nodeId
* @throws HyracksDataException