[ASTERIXDB-3196][*DB] Cluster state for compute-storage separation
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Implement changes required to drive cluster state based on
compute-storage partitions map.
- Persist index checkpoints to cloud storage.
- Remove eager caching from NC startup tasks.
- Fixes for static data partitioning.
Change-Id: I217da04d06884d841c4a56aee3ab9815cc659de7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17553
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index f60ed63..7f0b529 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -204,8 +204,7 @@
storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages());
lsmIOScheduler = createIoScheduler(storageProperties);
metadataMergePolicyFactory = new ConcurrentMergePolicyFactory();
- // TODO do we want to write checkpoints for cloud?
- indexCheckpointManagerProvider = new IndexCheckpointManagerProvider(ioManager);
+ indexCheckpointManagerProvider = new IndexCheckpointManagerProvider(persistenceIOManager);
ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory =
new PersistentLocalResourceRepositoryFactory(persistenceIOManager, indexCheckpointManagerProvider,
persistedResourceRegistry);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 4f9613d..449dd27 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -35,7 +35,6 @@
import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
import org.apache.asterix.app.nc.task.CheckpointTask;
-import org.apache.asterix.app.nc.task.CloudToLocalStorageCachingTask;
import org.apache.asterix.app.nc.task.ExportMetadataNodeTask;
import org.apache.asterix.app.nc.task.LocalRecoveryTask;
import org.apache.asterix.app.nc.task.LocalStorageCleanupTask;
@@ -52,7 +51,6 @@
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
@@ -222,11 +220,6 @@
tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING, nodeActivePartitions));
int metadataPartitionId = clusterManager.getMetadataPartition().getPartitionId();
tasks.add(new LocalStorageCleanupTask(metadataPartitionId));
-
- if (((ICcApplicationContext) (serviceContext.getControllerService()).getApplicationContext())
- .isCloudDeployment()) {
- tasks.add(new CloudToLocalStorageCachingTask(activePartitions));
- }
if (state == SystemState.CORRUPTED) {
// need to perform local recovery for node active partitions
LocalRecoveryTask rt = new LocalRecoveryTask(nodeActivePartitions);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 3d0fee8..5f714f4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -286,4 +286,10 @@
* @return the count of storage partitions
*/
int getStoragePartitionsCount();
+
+ /**
+ * Sets the compute-storage partitions map
+ * @param map
+ */
+ void setComputeStoragePartitionsMap(StorageComputePartitionsMap map);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
index 48b2ea1..6561d05 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
@@ -23,6 +23,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
public class StorageComputePartitionsMap {
@@ -53,9 +55,10 @@
}
}
int[][] computerToStoArray = new int[computeToStoragePartitions.size()][];
+ int partitionIdx = 0;
for (Map.Entry<Integer, List<Integer>> integerListEntry : computeToStoragePartitions.entrySet()) {
- computerToStoArray[integerListEntry.getKey()] =
- integerListEntry.getValue().stream().mapToInt(i -> i).toArray();
+ computerToStoArray[partitionIdx] = integerListEntry.getValue().stream().mapToInt(i -> i).toArray();
+ partitionIdx++;
}
return computerToStoArray;
}
@@ -94,4 +97,8 @@
}
return newMap;
}
+
+ public Set<String> getComputeNodes() {
+ return stoToComputeLocation.values().stream().map(ComputePartition::getNodeId).collect(Collectors.toSet());
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index b07a03e..ea1b9e6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -32,7 +32,6 @@
import java.util.Optional;
import java.util.stream.Collectors;
-import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -891,13 +890,7 @@
} else {
numElementsHint = Long.parseLong(numElementsHintString);
}
- int numPartitions = 0;
- List<String> nodeGroup =
- MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames();
- IClusterStateManager csm = appCtx.getClusterStateManager();
- for (String nd : nodeGroup) {
- numPartitions += csm.getNodePartitionsCount(nd);
- }
+ int numPartitions = getPartitioningProperties(dataset).getNumberOfPartitions();
return numElementsHint / numPartitions;
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 92ea173..0128478 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -30,6 +30,7 @@
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.ClusterPartition;
@@ -155,7 +156,16 @@
if (active) {
updateClusterCounters(nodeId, localCounters);
participantNodes.add(nodeId);
- activateNodePartitions(nodeId, activePartitions);
+ if (appCtx.isCloudDeployment()) {
+ // node compute partitions never change
+ ClusterPartition[] nodePartitions = getNodePartitions(nodeId);
+ activePartitions =
+ Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
+ activateNodePartitions(nodeId, activePartitions);
+ } else {
+ activateNodePartitions(nodeId, activePartitions);
+ }
+
} else {
participantNodes.remove(nodeId);
deactivateNodePartitions(nodeId);
@@ -183,16 +193,7 @@
return;
}
resetClusterPartitionConstraint();
- // if the cluster has no registered partitions or all partitions are pending activation -> UNUSABLE
- if (clusterPartitions.isEmpty()
- || clusterPartitions.values().stream().allMatch(ClusterPartition::isPendingActivation)) {
- LOGGER.info("Cluster does not have any registered partitions");
- setState(ClusterState.UNUSABLE);
- return;
- }
-
- // exclude partitions that are pending activation
- if (clusterPartitions.values().stream().anyMatch(p -> !p.isActive() && !p.isPendingActivation())) {
+ if (isClusterUnusable()) {
setState(ClusterState.UNUSABLE);
return;
}
@@ -310,9 +311,7 @@
clusterActiveLocations.removeAll(pendingRemoval);
clusterPartitionConstraint =
new AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new String[] {}));
- if (appCtx.getStorageProperties().getPartitioningScheme() == PartitioningScheme.STATIC) {
- storageComputePartitionsMap = StorageComputePartitionsMap.computePartitionsMap(this);
- }
+ resetStorageComputeMap();
}
@Override
@@ -512,6 +511,11 @@
return storageComputePartitionsMap;
}
+ @Override
+ public synchronized void setComputeStoragePartitionsMap(StorageComputePartitionsMap map) {
+ this.storageComputePartitionsMap = map;
+ }
+
private void updateClusterCounters(String nodeId, NcLocalCounters localCounters) {
final IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
resourceIdManager.report(nodeId, localCounters.getMaxResourceId());
@@ -543,6 +547,36 @@
false));
}
+ private synchronized boolean isClusterUnusable() {
+ // if the cluster has no registered partitions or all partitions are pending activation -> UNUSABLE
+ if (clusterPartitions.isEmpty()
+ || clusterPartitions.values().stream().allMatch(ClusterPartition::isPendingActivation)) {
+ LOGGER.info("Cluster does not have any registered partitions");
+ return true;
+ }
+ if (appCtx.isCloudDeployment() && storageComputePartitionsMap != null) {
+ Set<String> computeNodes = storageComputePartitionsMap.getComputeNodes();
+ if (!participantNodes.containsAll(computeNodes)) {
+ LOGGER.info("Cluster missing compute nodes; required {}, current {}", computeNodes, participantNodes);
+ return true;
+ }
+ } else {
+ // exclude partitions that are pending activation
+ if (clusterPartitions.values().stream().anyMatch(p -> !p.isActive() && !p.isPendingActivation())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private synchronized void resetStorageComputeMap() {
+ if (storageComputePartitionsMap == null
+ && appCtx.getStorageProperties().getPartitioningScheme() == PartitioningScheme.STATIC
+ && !isClusterUnusable()) {
+ storageComputePartitionsMap = StorageComputePartitionsMap.computePartitionsMap(this);
+ }
+ }
+
private static InetSocketAddress getReplicaLocation(IClusterStateManager csm, String nodeId) {
final Map<IOption, Object> ncConfig = csm.getActiveNcConfiguration().get(nodeId);
if (ncConfig == null) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 080204e..25b9610 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -646,8 +646,8 @@
public synchronized List<FileReference> getOnDiskPartitions() throws HyracksDataException {
List<FileReference> onDiskPartitions = new ArrayList<>();
for (FileReference root : storageRoots) {
- Collection<FileReference> partitions = ioManager.list(root,
- (dir, name) -> dir.isDirectory() && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
+ Collection<FileReference> partitions = ioManager.list(root, (dir, name) -> dir != null && dir.isDirectory()
+ && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
if (partitions != null) {
onDiskPartitions.addAll(partitions);
}