[ASTERIXDB-3364][STO] Use bootstrap marker during metadata bootstrap

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Before bootstrapping the metadata catalog for the first time,
  put a bootstrap marker (file) in the blob storage to indicate
  that the metadata hasn't been fully bootstrapped.
- Delete the bootstrap marker from the blob storage on successful
  metadata bootstrap attempts.
- When checking the system state on missing checkpoints, check
  for the bootstrap marker to detect failed bootstrap attempts.
- Delete any existing metadata files from the blob storage if
  the bootstrap marker is found to start from a clean state.

Change-Id: I805bf1cbdfb58690876da0c68840ed2c365742a4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18199
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
index a44a695..6b3257d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
@@ -24,6 +24,7 @@
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.cloud.IPartitionBootstrapper;
+import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -58,9 +59,10 @@
         String nodeId = applicationContext.getServiceContext().getNodeId();
         LOGGER.info("Initializing Node {} with storage partitions: {}", nodeId, storagePartitions);
 
+        Checkpoint latestCheckpoint = applicationContext.getTransactionSubsystem().getCheckpointManager().getLatest();
         IPartitionBootstrapper bootstrapper = applicationContext.getPartitionBootstrapper();
-        bootstrapper.bootstrap(storagePartitions, lrs.getOnDiskPartitions(), metadataNode, metadataPartitionId,
-                cleanup);
+        bootstrapper.bootstrap(storagePartitions, lrs.getOnDiskPartitions(), metadataNode, metadataPartitionId, cleanup,
+                latestCheckpoint == null);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
index f58f871..246983d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
@@ -21,8 +21,11 @@
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.service.IControllerService;
 
 public class MetadataBootstrapTask implements INCLifecycleTask {
@@ -40,12 +43,33 @@
         try {
             appContext.getReplicaManager().promote(partitionId);
             SystemState state = appContext.getTransactionSubsystem().getRecoveryManager().getSystemState();
-            appContext.initializeMetadata(state == SystemState.PERMANENT_DATA_LOSS, partitionId);
+            boolean firstBootstrap = state == SystemState.PERMANENT_DATA_LOSS;
+            if (firstBootstrap) {
+                writeBootstrapMarker(appContext);
+            }
+            appContext.initializeMetadata(firstBootstrap, partitionId);
+            if (firstBootstrap) {
+                deleteBootstrapMarker(appContext);
+            }
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
     }
 
+    private void writeBootstrapMarker(INcApplicationContext appContext) throws HyracksDataException {
+        IIOManager persistenceIoManager = appContext.getPersistenceIoManager();
+        FileReference bootstrapMarker = persistenceIoManager
+                .resolve(StoragePathUtil.getBootstrapMarkerRelativePath(appContext.getNamespacePathResolver()));
+        persistenceIoManager.overwrite(bootstrapMarker, new byte[0]);
+    }
+
+    private void deleteBootstrapMarker(INcApplicationContext appContext) throws HyracksDataException {
+        IIOManager persistenceIoManager = appContext.getPersistenceIoManager();
+        FileReference bootstrapMarker = persistenceIoManager
+                .resolve(StoragePathUtil.getBootstrapMarkerRelativePath(appContext.getNamespacePathResolver()));
+        persistenceIoManager.delete(bootstrapMarker);
+    }
+
     @Override
     public String toString() {
         return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }";
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index f271f1d..368be26 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -47,7 +47,6 @@
 import org.apache.hyracks.api.io.IIOBulkOperation;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
-import org.apache.hyracks.util.file.FileUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -56,21 +55,19 @@
 
 public abstract class AbstractCloudIOManager extends IOManager implements IPartitionBootstrapper {
     private static final Logger LOGGER = LogManager.getLogger();
-    //TODO(DB): change
-    private final String metadataNamespacePath;
     protected final ICloudClient cloudClient;
     protected final IWriteBufferProvider writeBufferProvider;
     protected final String bucket;
     protected final Set<Integer> partitions;
     protected final List<FileReference> partitionPaths;
     protected final IOManager localIoManager;
+    protected final INamespacePathResolver nsPathResolver;
 
     public AbstractCloudIOManager(IOManager ioManager, CloudProperties cloudProperties,
             INamespacePathResolver nsPathResolver) throws HyracksDataException {
         super(ioManager.getIODevices(), ioManager.getDeviceComputer(), ioManager.getIOParallelism(),
                 ioManager.getQueueSize());
-        this.metadataNamespacePath = FileUtil.joinPath(STORAGE_ROOT_DIR_NAME, PARTITION_DIR_PREFIX + METADATA_PARTITION,
-                nsPathResolver.resolve(MetadataConstants.METADATA_NAMESPACE));
+        this.nsPathResolver = nsPathResolver;
         this.bucket = cloudProperties.getStorageBucket();
         cloudClient = CloudClientProvider.getClient(cloudProperties);
         int numOfThreads = getIODevices().size() * getIOParallelism();
@@ -88,7 +85,9 @@
 
     @Override
     public IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint() {
-        if (cloudClient.listObjects(bucket, metadataNamespacePath, IoUtil.NO_OP_FILTER).isEmpty()) {
+        Set<String> existingMetadataFiles = getCloudMetadataPartitionFiles();
+        String bootstrapMarkerPath = StoragePathUtil.getBootstrapMarkerRelativePath(nsPathResolver);
+        if (existingMetadataFiles.isEmpty() || existingMetadataFiles.contains(bootstrapMarkerPath)) {
             LOGGER.info("First time to initialize this cluster: systemState = PERMANENT_DATA_LOSS");
             return IRecoveryManager.SystemState.PERMANENT_DATA_LOSS;
         } else {
@@ -99,11 +98,15 @@
 
     @Override
     public final void bootstrap(Set<Integer> activePartitions, List<FileReference> currentOnDiskPartitions,
-            boolean metadataNode, int metadataPartition, boolean cleanup) throws HyracksDataException {
+            boolean metadataNode, int metadataPartition, boolean cleanup, boolean ensureCompleteBootstrap)
+            throws HyracksDataException {
         partitions.clear();
         partitions.addAll(activePartitions);
         if (metadataNode) {
             partitions.add(metadataPartition);
+            if (ensureCompleteBootstrap) {
+                ensureCompleteMetadataBootstrap();
+            }
         }
 
         partitionPaths.clear();
@@ -290,4 +293,26 @@
         cloudClient.write(bucket, key, bytes);
     }
 
+    private Set<String> getCloudMetadataPartitionFiles() {
+        String metadataNamespacePath = StoragePathUtil.getNamespacePath(nsPathResolver,
+                MetadataConstants.METADATA_NAMESPACE, METADATA_PARTITION);
+        return cloudClient.listObjects(bucket, metadataNamespacePath, IoUtil.NO_OP_FILTER);
+    }
+
+    private void ensureCompleteMetadataBootstrap() throws HyracksDataException {
+        Set<String> metadataPartitionFiles = getCloudMetadataPartitionFiles();
+        boolean foundBootstrapMarker =
+                metadataPartitionFiles.contains(StoragePathUtil.getBootstrapMarkerRelativePath(nsPathResolver));
+        // if the bootstrap file exists, we failed to bootstrap --> delete all partial files in metadata partition
+        if (foundBootstrapMarker) {
+            LOGGER.info(
+                    "detected failed bootstrap attempted, deleting all existing files in the metadata partition: {}",
+                    metadataPartitionFiles);
+            IIOBulkOperation deleteBulkOperation = createDeleteBulkOperation();
+            for (String file : metadataPartitionFiles) {
+                deleteBulkOperation.add(resolve(file));
+            }
+            performBulkOperation(deleteBulkOperation);
+        }
+    }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java
index db7b6d6..54090bb 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java
@@ -49,7 +49,8 @@
 
     @Override
     public void bootstrap(Set<Integer> activePartitions, List<FileReference> currentOnDiskPartitions,
-            boolean metadataNode, int metadataPartition, boolean cleanup) throws HyracksDataException {
+            boolean metadataNode, int metadataPartition, boolean cleanup, boolean ensureCompleteBootstrap)
+            throws HyracksDataException {
         for (FileReference onDiskPartition : currentOnDiskPartitions) {
             int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
             if (!activePartitions.contains(partitionNum)) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
index 1c7713a..8d28d3a 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.cloud.bulk;
 
-import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.cloud.clients.ICloudClient;
@@ -49,7 +49,7 @@
          * TODO What about deleting multiple directories?
          *      Actually, is there a case where we delete multiple directories from the cloud?
          */
-        List<String> paths = fileReferences.stream().map(FileReference::getRelativePath).collect(Collectors.toList());
+        Set<String> paths = fileReferences.stream().map(FileReference::getRelativePath).collect(Collectors.toSet());
         if (paths.isEmpty()) {
             return 0;
         }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
index fd9a1b3..6bb4176 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
@@ -51,7 +51,8 @@
      * @param metadataNode            whether the node is a metadata node as well
      * @param metadataPartition       metadata partition number
      * @param cleanup                 performs cleanup by deleting all unkept partitions
+     * @param ensureCompleteBootstrap ensures the metadata catalog was fully bootstrapped
      */
     void bootstrap(Set<Integer> activePartitions, List<FileReference> currentOnDiskPartitions, boolean metadataNode,
-            int metadataPartition, boolean cleanup) throws HyracksDataException;
+            int metadataPartition, boolean cleanup, boolean ensureCompleteBootstrap) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 5dcaaf4..eb39f23 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -50,6 +50,7 @@
     public static final String DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME = "correlated-prefix";
     public static final Map<String, String> DEFAULT_COMPACTION_POLICY_PROPERTIES;
     public static final int METADATA_PARTITION = -1;
+    public static final String BOOTSTRAP_FILE_NAME = ".bootstrap";
 
     /**
      * The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..).
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index e9a8753..28fd27e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.utils;
 
+import static org.apache.asterix.common.utils.StorageConstants.METADATA_PARTITION;
 import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
 import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
 
@@ -26,8 +27,11 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.asterix.common.api.INamespacePathResolver;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataConstants;
+import org.apache.asterix.common.metadata.Namespace;
 import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -40,6 +44,7 @@
 import org.apache.hyracks.api.io.MappedFileSplit;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.util.file.FileUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -226,4 +231,14 @@
     public static boolean isRelativeParent(FileReference parent, FileReference child) {
         return child.getRelativePath().startsWith(parent.getRelativePath());
     }
+
+    public static String getNamespacePath(INamespacePathResolver nsPathResolver, Namespace namespace, int partition) {
+        return FileUtil.joinPath(prepareStoragePartitionPath(partition), nsPathResolver.resolve(namespace));
+    }
+
+    public static String getBootstrapMarkerRelativePath(INamespacePathResolver namespacePathResolver) {
+        String metadataNamespacePath = StoragePathUtil.getNamespacePath(namespacePathResolver,
+                MetadataConstants.METADATA_NAMESPACE, METADATA_PARTITION);
+        return FileUtil.joinPath(metadataNamespacePath, StorageConstants.BOOTSTRAP_FILE_NAME);
+    }
 }