[NO ISSUE][STO] Clean up invalid resources on bootstrap
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add new node startup tasks to delete invalid resources.
- Ensure index max disk LSN is initialized during local
recovery even if the index is already registered.
Change-Id: Ia13182e96ec65951e837f71ffc4db3e92a43a7b0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12766
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 0359cf1..ffde7d0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -160,7 +160,7 @@
@Override
public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException {
state = SystemState.RECOVERING;
- LOGGER.info("starting recovery ...");
+ LOGGER.info("starting recovery for partitions {}", partitions);
long readableSmallestLSN = logMgr.getReadableSmallestLSN();
Checkpoint checkpointObject = checkpointManager.getLatest();
@@ -363,10 +363,8 @@
datasetLifecycleManager.register(localResource.getPath(), index);
datasetLifecycleManager.open(localResource.getPath());
try {
- final DatasetResourceReference resourceReference =
- DatasetResourceReference.of(localResource);
- maxDiskLastLsn =
- indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
+ maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
+ indexCheckpointManagerProvider);
} catch (HyracksDataException e) {
datasetLifecycleManager.close(localResource.getPath());
throw e;
@@ -374,7 +372,12 @@
//#. set resourceId and maxDiskLastLSN to the map
resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
} else {
- maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+ if (!resourceId2MaxLSNMap.containsKey(resourceId)) {
+ maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
+ indexCheckpointManagerProvider);
+ } else {
+ maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+ }
}
// lsn @ maxDiskLastLsn is either a flush log or a master replica log
if (lsn >= maxDiskLastLsn) {
@@ -858,6 +861,19 @@
index.resetCurrentComponentIndex();
}
+ private long getResourceLowWaterMark(LocalResource localResource, IDatasetLifecycleManager datasetLifecycleManager,
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider) throws HyracksDataException {
+ long maxDiskLastLsn;
+ try {
+ final DatasetResourceReference resourceReference = DatasetResourceReference.of(localResource);
+ maxDiskLastLsn = indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
+ } catch (HyracksDataException e) {
+ datasetLifecycleManager.close(localResource.getPath());
+ throw e;
+ }
+ return maxDiskLastLsn;
+ }
+
private class JobEntityCommits {
private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
private final long txnId;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index dd1b6e7..655f9da 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -123,8 +123,6 @@
if (!partitions.contains(partition)) {
return;
}
- final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
- datasetLifecycleManager.flushDataset(appCtx.getReplicationManager().getReplicationStrategy());
closePartitionResources(partition);
final List<IPartitionReplica> partitionReplicas = getReplicas(partition);
for (IPartitionReplica replica : partitionReplicas) {
@@ -139,10 +137,12 @@
}
public void closePartitionResources(int partition) throws HyracksDataException {
+ final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
+ //TODO(mhubail) we can flush only datasets of the requested partition
+ datasetLifecycleManager.flushAllDatasets();
final PersistentLocalResourceRepository resourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
final Map<Long, LocalResource> partitionResources = resourceRepository.getPartitionResources(partition);
- final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
for (LocalResource resource : partitionResources.values()) {
datasetLifecycleManager.closeIfOpen(resource.getPath());
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
new file mode 100644
index 0000000..5471fb8
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the LocalStorageCleanupTask at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class LocalStorageCleanupTask implements INCLifecycleTask {
+
+ private static final long serialVersionUID = 1L;
+ private final int metadataPartitionId;
+
+ public LocalStorageCleanupTask(int metadataPartitionId) {
+ this.metadataPartitionId = metadataPartitionId;
+ }
+
+ @Override
+ public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
+ INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
+ PersistentLocalResourceRepository localResourceRepository =
+ (PersistentLocalResourceRepository) appContext.getLocalResourceRepository();
+ localResourceRepository.deleteInvalidIndexes(r -> {
+ DatasetLocalResource lr = (DatasetLocalResource) r.getResource();
+ return MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId())
+ && lr.getPartition() != metadataPartitionId;
+ });
+ }
+
+ @Override
+ public String toString() {
+ return "LocalStorageCleanupTask{" + "metadataPartitionId=" + metadataPartitionId + '}';
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index f164773..97e1a56 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -37,6 +37,7 @@
import org.apache.asterix.app.nc.task.CheckpointTask;
import org.apache.asterix.app.nc.task.ExportMetadataNodeTask;
import org.apache.asterix.app.nc.task.LocalRecoveryTask;
+import org.apache.asterix.app.nc.task.LocalStorageCleanupTask;
import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
import org.apache.asterix.app.nc.task.RetrieveLibrariesTask;
import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
@@ -213,6 +214,8 @@
Set<Integer> activePartitions) {
final List<INCLifecycleTask> tasks = new ArrayList<>();
tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
+ int metadataPartitionId = clusterManager.getMetadataPartition().getPartitionId();
+ tasks.add(new LocalStorageCleanupTask(metadataPartitionId));
if (state == SystemState.CORRUPTED) {
// need to perform local recovery for node active partitions
LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions);
@@ -222,7 +225,7 @@
tasks.add(new StartReplicationServiceTask());
}
if (metadataNode) {
- tasks.add(new MetadataBootstrapTask(clusterManager.getMetadataPartition().getPartitionId()));
+ tasks.add(new MetadataBootstrapTask(metadataPartitionId));
}
tasks.add(new CheckpointTask());
tasks.add(new StartLifecycleComponentsTask());
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 556ab6a..f753868 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -273,6 +273,24 @@
return resourcesMap;
}
+ public synchronized void deleteInvalidIndexes(Predicate<LocalResource> filter) throws HyracksDataException {
+ for (Path root : storageRoots) {
+ final Collection<File> files = FileUtils.listFiles(root.toFile(), METADATA_FILES_FILTER, ALL_DIR_FILTER);
+ try {
+ for (File file : files) {
+ final LocalResource localResource = readLocalResource(file);
+ if (filter.test(localResource)) {
+ LOGGER.warn("deleting invalid metadata index {}", file.getParentFile());
+ IoUtil.delete(file.getParentFile());
+ }
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ resourceCache.invalidateAll();
+ }
+
public Map<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
return getResources(p -> true);
}