[NO ISSUE][CLUS] Include NC Local Counters in Startup Message
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Include NC local counters in the NC startup tasks
completion completion message.
- Ensure no NC is marked as active without its local
counters reported.
- Remove the need for individual NC local counters
message.
- Clean up ITxnIdFactory and IResourceIdManager APIs.
Change-Id: Ief8b9d43783ea22810c6fdb29947a1284e32daee
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2767
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
deleted file mode 100644
index 53b13e8..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.nc.task;
-
-import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.runtime.message.ReportLocalCountersMessage;
-import org.apache.hyracks.api.control.CcId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
-
-public class ReportLocalCountersTask implements INCLifecycleTask {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
- ReportLocalCountersMessage.send(ccId, (NodeControllerService) cs);
- }
-
- @Override
- public String toString() {
- return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }";
- }
-}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 043e02e..961593f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -31,7 +31,6 @@
import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
import org.apache.asterix.app.nc.task.LocalRecoveryTask;
import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
-import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
import org.apache.asterix.app.nc.task.UpdateNodeStatusTask;
@@ -79,7 +78,7 @@
@Override
public void notifyNodeFailure(String nodeId) throws HyracksDataException {
pendingStartupCompletionNodes.remove(nodeId);
- clusterManager.updateNodePartitions(nodeId, false);
+ clusterManager.updateNodeState(nodeId, false, null);
if (nodeId.equals(metadataNodeId)) {
clusterManager.updateMetadataNode(metadataNodeId, false);
}
@@ -112,20 +111,18 @@
private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
final String nodeId = msg.getNodeId();
List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState());
- if (!tasks.isEmpty()) {
- RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
- try {
- messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
+ RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
+ try {
+ messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
}
}
private void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException {
pendingStartupCompletionNodes.remove(msg.getNodeId());
if (msg.isSuccess()) {
- clusterManager.updateNodePartitions(msg.getNodeId(), true);
+ clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters());
if (msg.getNodeId().equals(metadataNodeId)) {
clusterManager.updateMetadataNode(metadataNodeId, true);
}
@@ -156,7 +153,6 @@
if (metadataNode) {
tasks.add(new BindMetadataNodeTask());
}
- tasks.add(new ReportLocalCountersTask());
return tasks;
}
@@ -203,7 +199,6 @@
tasks.add(new ExportMetadataNodeTask(true));
tasks.add(new BindMetadataNodeTask());
}
- tasks.add(new ReportLocalCountersTask());
tasks.add(new UpdateNodeStatusTask(NodeStatus.ACTIVE));
return tasks;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 5ac3a0c..21022ed 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.app.replication.message;
+import org.apache.asterix.common.utils.NcLocalCounters;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.replication.INCLifecycleMessage;
@@ -29,14 +30,16 @@
private final String nodeId;
private final boolean success;
private Throwable exception;
+ private final NcLocalCounters localCounters;
- public NCLifecycleTaskReportMessage(String nodeId, boolean success) {
+ public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters) {
this.nodeId = nodeId;
this.success = success;
+ this.localCounters = localCounters;
}
@Override
- public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ public void handle(ICcApplicationContext appCtx) throws HyracksDataException {
appCtx.getNcLifecycleCoordinator().process(this);
}
@@ -56,6 +59,10 @@
this.exception = exception;
}
+ public NcLocalCounters getLocalCounters() {
+ return localCounters;
+ }
+
@Override
public MessageType getType() {
return MessageType.REGISTRATION_TASKS_RESULT;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index 868c2ad..2d2fe97 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -20,6 +20,7 @@
import java.util.List;
+import org.apache.asterix.common.utils.NcLocalCounters;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.messaging.CcIdentifiedMessage;
@@ -28,6 +29,7 @@
import org.apache.asterix.common.replication.INCLifecycleMessage;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.util.ExitUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -68,7 +70,9 @@
success = false;
exception = e;
}
- NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success);
+ NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
+ (NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
+ NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success, localCounter);
result.setException(exception);
try {
broker.sendMessageToCC(getCcId(), result);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index 4beb44a..9d367ac 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -31,14 +31,19 @@
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.config.MetadataProperties;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
+import org.apache.asterix.common.utils.NcLocalCounters;
import org.apache.asterix.runtime.transaction.ResourceIdManager;
+import org.apache.asterix.runtime.utils.BulkTxnIdFactory;
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobIdFactory;
+import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.application.CCServiceContext;
import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig;
import org.apache.hyracks.control.common.config.ConfigManager;
@@ -194,8 +199,7 @@
private void notifyNodeStartupCompletion(CcApplicationContext applicationContext, String nodeId)
throws HyracksDataException {
- NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true);
- applicationContext.getResourceIdManager().report(nodeId, 0);
+ NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters());
applicationContext.getNcLifecycleCoordinator().process(msg);
}
@@ -204,7 +208,12 @@
ConfigManager configManager = new ConfigManager(null);
IApplicationConfig applicationConfig = new ConfigManagerApplicationConfig(configManager);
ICCServiceContext iccServiceContext = Mockito.mock(CCServiceContext.class);
+ final ClusterControllerService ccs = Mockito.mock(ClusterControllerService.class);
+ JobIdFactory jobIdFactory = new JobIdFactory(CcId.valueOf(0));
+ Mockito.when(ccs.getJobIdFactory()).thenReturn(jobIdFactory);
Mockito.when(iccServiceContext.getAppConfig()).thenReturn(applicationConfig);
+ Mockito.when(iccServiceContext.getControllerService()).thenReturn(ccs);
+
Mockito.when(ccApplicationContext.getServiceContext()).thenReturn(iccServiceContext);
NcLifecycleCoordinator coordinator =
@@ -225,6 +234,9 @@
IGlobalRecoveryManager globalRecoveryManager = Mockito.mock(IGlobalRecoveryManager.class);
Mockito.when(globalRecoveryManager.isRecoveryCompleted()).thenReturn(true);
Mockito.when(ccApplicationContext.getGlobalRecoveryManager()).thenReturn(globalRecoveryManager);
+
+ BulkTxnIdFactory bulkTxnIdFactory = new BulkTxnIdFactory();
+ Mockito.when(ccApplicationContext.getTxnIdFactory()).thenReturn(bulkTxnIdFactory);
return ccApplicationContext;
}
@@ -238,4 +250,12 @@
Mockito.when(metadataProperties.getNodePartitions()).thenReturn(nodePartitionsMap);
return metadataProperties;
}
+
+ private NcLocalCounters mockLocalCounters() {
+ final NcLocalCounters localCounters = Mockito.mock(NcLocalCounters.class);
+ Mockito.when(localCounters.getMaxJobId()).thenReturn(1000L);
+ Mockito.when(localCounters.getMaxResourceId()).thenReturn(1000L);
+ Mockito.when(localCounters.getMaxTxnId()).thenReturn(1000L);
+ return localCounters;
+ }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/pom.xml b/asterixdb/asterix-common/pom.xml
index ee7fe5c..72c4099 100644
--- a/asterixdb/asterix-common/pom.xml
+++ b/asterixdb/asterix-common/pom.xml
@@ -287,5 +287,9 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index dda9ffd..5e99eec 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -25,6 +25,7 @@
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.utils.NcLocalCounters;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.config.IOption;
@@ -59,9 +60,10 @@
*
* @param nodeId
* @param active
+ * @param ncLocalCounters
* @throws HyracksDataException
*/
- void updateNodePartitions(String nodeId, boolean active) throws HyracksDataException;
+ void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters) throws HyracksDataException;
/**
* Updates the active node and active state of the cluster partition with id {@code partitionNum}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
index ce49ccf6..d36d383 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
@@ -18,14 +18,12 @@
*/
package org.apache.asterix.common.transactions;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
public interface IResourceIdManager {
long createResourceId();
boolean reported(String nodeId);
- void report(String nodeId, long maxResourceId) throws HyracksDataException;
+ void report(String nodeId, long maxResourceId);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
index be4a1f8..5c28f3f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
@@ -36,7 +36,7 @@
* @param id
* the value to ensure future created transaction ids are larger than
*/
- void ensureMinimumId(long id) throws AlgebricksException;
+ void ensureMinimumId(long id);
/**
* The highest transaction id this factory has created
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
new file mode 100644
index 0000000..7cd61d8
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.utils;
+
+import java.io.Serializable;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class NcLocalCounters implements Serializable {
+
+ private final long maxResourceId;
+ private final long maxTxnId;
+ private final long maxJobId;
+
+ private NcLocalCounters(long maxResourceId, long maxTxnId, long maxJobId) {
+ this.maxResourceId = maxResourceId;
+ this.maxTxnId = maxTxnId;
+ this.maxJobId = maxJobId;
+ }
+
+ public static NcLocalCounters collect(CcId ccId, NodeControllerService ncs) throws HyracksDataException {
+ final INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext();
+ long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
+ MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
+ long maxTxnId = appContext.getMaxTxnId();
+ long maxJobId = ncs.getMaxJobId(ccId);
+ return new NcLocalCounters(maxResourceId, maxTxnId, maxJobId);
+ }
+
+ public long getMaxResourceId() {
+ return maxResourceId;
+ }
+
+ public long getMaxTxnId() {
+ return maxTxnId;
+ }
+
+ public long getMaxJobId() {
+ return maxJobId;
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
index 6b3b6a0f..f530afe 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
@@ -62,7 +62,7 @@
}
@Override
- public void ensureMinimumId(long id) throws AlgebricksException {
+ public void ensureMinimumId(long id) {
throw new UnsupportedOperationException();
}
diff --git a/asterixdb/asterix-runtime/pom.xml b/asterixdb/asterix-runtime/pom.xml
index 22d1bfc..4cb4aad 100644
--- a/asterixdb/asterix-runtime/pom.xml
+++ b/asterixdb/asterix-runtime/pom.xml
@@ -105,10 +105,6 @@
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-control-nc</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
<artifactId>algebricks-common</artifactId>
</dependency>
<dependency>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
deleted file mode 100644
index fe9a5b8..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.message;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
-import org.apache.asterix.common.transactions.IResourceIdManager;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.control.CcId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class ReportLocalCountersMessage implements ICcAddressedMessage {
- private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = LogManager.getLogger();
- private final long maxResourceId;
- private final long maxTxnId;
- private final long maxJobId;
- private final String src;
-
- public ReportLocalCountersMessage(String src, long maxResourceId, long maxTxnId, long maxJobId) {
- this.src = src;
- this.maxResourceId = maxResourceId;
- this.maxTxnId = maxTxnId;
- this.maxJobId = maxJobId;
- }
-
- @Override
- public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
- try {
- appCtx.getTxnIdFactory().ensureMinimumId(maxTxnId);
- } catch (AlgebricksException e) {
- throw HyracksDataException.create(e);
- }
- resourceIdManager.report(src, maxResourceId);
- ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobIdFactory()
- .setMaxJobId(maxJobId);
- }
-
- public static void send(CcId ccId, NodeControllerService ncs) throws HyracksDataException {
- INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext();
- long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
- MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
- long maxTxnId = appContext.getMaxTxnId();
- long maxJobId = ncs.getMaxJobId(ccId);
- ReportLocalCountersMessage countersMessage =
- new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId, maxJobId);
- try {
- ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(ccId, countersMessage);
- } catch (Exception e) {
- LOGGER.log(Level.ERROR, "Unable to report local counters", e);
- throw HyracksDataException.create(e);
- }
- }
-
- @Override
- public String toString() {
- return ReportLocalCountersMessage.class.getSimpleName();
- }
-}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
deleted file mode 100644
index 51f53e7..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.message;
-
-import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.messaging.CcIdentifiedMessage;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.control.nc.NodeControllerService;
-
-public class ReportLocalCountersRequestMessage extends CcIdentifiedMessage implements INcAddressedMessage {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- ReportLocalCountersMessage.send(getCcId(),
- (NodeControllerService) appCtx.getServiceContext().getControllerService());
- }
-
- @Override
- public String toString() {
- return ReportLocalCountersRequestMessage.class.getSimpleName();
- }
-}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 087913f..c767c52 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -49,7 +49,6 @@
new Exception("Cannot generate global resource id when cluster is not active."));
} else {
response.setException(new Exception("One or more nodes has not reported max resource id."));
- requestMaxResourceID(clusterStateManager, resourceIdManager, broker);
}
}
broker.sendApplicationMessageToNC(response, src);
@@ -58,17 +57,6 @@
}
}
- private void requestMaxResourceID(IClusterStateManager clusterStateManager, IResourceIdManager resourceIdManager,
- ICCMessageBroker broker) throws Exception {
- Set<String> getParticipantNodes = clusterStateManager.getParticipantNodes(true);
- ReportLocalCountersRequestMessage msg = new ReportLocalCountersRequestMessage();
- for (String nodeId : getParticipantNodes) {
- if (!resourceIdManager.reported(nodeId)) {
- broker.sendApplicationMessageToNC(msg, nodeId);
- }
- }
- }
-
@Override
public String toString() {
return ResourceIdRequestMessage.class.getSimpleName();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
index 6d3077e..5bcd5aa 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
@@ -24,7 +24,6 @@
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.transactions.IResourceIdManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
public class ResourceIdManager implements IResourceIdManager {
@@ -48,11 +47,8 @@
}
@Override
- public void report(String nodeId, long maxResourceId) throws HyracksDataException {
+ public void report(String nodeId, long maxResourceId) {
globalResourceId.updateAndGet(prev -> Math.max(maxResourceId, prev));
- if (reportedNodes.add(nodeId)) {
- csm.refreshState();
- }
+ reportedNodes.add(nodeId);
}
-
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
index 542bc17..296ce9c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
@@ -24,7 +24,7 @@
import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.transactions.TxnId;
-class BulkTxnIdFactory implements ITxnIdFactory {
+public class BulkTxnIdFactory implements ITxnIdFactory {
private final AtomicLong maxId = new AtomicLong();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 8539fa4..8d3187b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -37,12 +37,14 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.asterix.common.utils.NcLocalCounters;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.Section;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig;
import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -133,8 +135,9 @@
}
@Override
- public synchronized void updateNodePartitions(String nodeId, boolean active) {
+ public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters) {
if (active) {
+ updateClusterCounters(nodeId, localCounters);
participantNodes.add(nodeId);
} else {
participantNodes.remove(nodeId);
@@ -182,15 +185,6 @@
setState(ClusterState.UNUSABLE);
return;
}
-
- IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
- for (String node : participantNodes) {
- if (!resourceIdManager.reported(node)) {
- LOGGER.info("Partitions are ready but {} has not yet registered its max resource id...", node);
- setState(ClusterState.UNUSABLE);
- return;
- }
- }
// the metadata bootstrap & global recovery must be complete before the cluster can be active
if (metadataNodeActive) {
if (state != ClusterState.ACTIVE && state != ClusterState.RECOVERING) {
@@ -452,6 +446,14 @@
return metadataPartition;
}
+ private void updateClusterCounters(String nodeId, NcLocalCounters localCounters) {
+ final IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
+ resourceIdManager.report(nodeId, localCounters.getMaxResourceId());
+ appCtx.getTxnIdFactory().ensureMinimumId(localCounters.getMaxTxnId());
+ ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobIdFactory()
+ .setMaxJobId(localCounters.getMaxJobId());
+ }
+
private void updateNodeConfig(String nodeId, Map<IOption, Object> configuration) {
ConfigManager configManager =
((ConfigManagerApplicationConfig) appCtx.getServiceContext().getAppConfig()).getConfigManager();