[NO ISSUE][STO] Delete Corrupted Resources Metadata Files

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Use a mask file to determiner whether a resource metadata
  file was completely written to disk or not.
- Delete corrupted resources metadata files that were
  not completely written to disk on NC startup/shutdown.
- Add test case.

Change-Id: Ib205453a6620734930b9dde7652277d2c5a1d6ed
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2763
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index dc5582b..b7b7c39 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -237,6 +237,7 @@
         final Set<Integer> nodePartitions = runtimeContext.getReplicaManager().getPartitions();
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository();
+        localResourceRepository.deleteCorruptedResources();
         for (Integer partition : nodePartitions) {
             localResourceRepository.cleanup(partition);
         }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
index fb1adde..694a0c7 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
@@ -166,4 +166,26 @@
                 .filter(file -> file.getName().startsWith(validComponentTimestamp.get())).count();
         Assert.assertTrue(validComponentFilesCount > 0);
     }
+
+    @Test
+    public void deleteCorruptedResourcesTest() throws Exception {
+        final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
+        final String nodeId = ncAppCtx.getServiceContext().getNodeId();
+        final String datasetName = "ds";
+        TestDataUtil.createIdOnlyDataset(datasetName);
+        final Dataset dataset = TestDataUtil.getDataset(integrationUtil, datasetName);
+        final String indexPath = TestDataUtil.getIndexPath(integrationUtil, dataset, nodeId);
+        final FileReference indexDirRef = ncAppCtx.getIoManager().resolve(indexPath);
+        final File indexMetadataFile = new File(indexDirRef.getFile(), StorageConstants.METADATA_FILE_NAME);
+        Assert.assertTrue(indexMetadataFile.exists());
+        // forge a mask file and ensure the metadata file and its mask files will be deleted after restart
+        final File indexMetadataMaskFile = new File(indexDirRef.getFile(),
+                StorageConstants.MASK_FILE_PREFIX + StorageConstants.METADATA_FILE_NAME);
+        Files.createFile(indexMetadataMaskFile.toPath());
+        Assert.assertTrue(indexMetadataMaskFile.exists());
+        integrationUtil.deinit(false);
+        integrationUtil.init(false, TEST_CONFIG_FILE_NAME);
+        Assert.assertFalse(indexMetadataFile.exists());
+        Assert.assertFalse(indexMetadataMaskFile.exists());
+    }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 56b5910..7fe1d1f 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -19,15 +19,14 @@
 package org.apache.asterix.transaction.management.resource;
 
 import static org.apache.asterix.common.utils.StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX;
+import static org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME;
 import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
 import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER;
 import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
-import java.io.ObjectOutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -86,6 +85,8 @@
 
     private static final Logger LOGGER = LogManager.getLogger();
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final String METADATA_FILE_MASK_NAME =
+            StorageConstants.MASK_FILE_PREFIX + StorageConstants.METADATA_FILE_NAME;
     private static final FilenameFilter LSM_INDEX_FILES_FILTER =
             (dir, name) -> !name.startsWith(INDEX_CHECKPOINT_FILE_PREFIX);
     private static final FilenameFilter MASK_FILES_FILTER =
@@ -103,6 +104,18 @@
         }
     };
 
+    private static final IOFileFilter METADATA_MASK_FILES_FILTER = new IOFileFilter() {
+        @Override
+        public boolean accept(File file) {
+            return file.getName().equals(METADATA_FILE_MASK_NAME);
+        }
+
+        @Override
+        public boolean accept(File dir, String name) {
+            return false;
+        }
+    };
+
     private static final IOFileFilter ALL_DIR_FILTER = new IOFileFilter() {
         @Override
         public boolean accept(File file) {
@@ -181,18 +194,15 @@
         if (!parent.exists() && !parent.mkdirs()) {
             throw HyracksDataException.create(CANNOT_CREATE_FILE, parent.getAbsolutePath());
         }
-
-        try (FileOutputStream fos = new FileOutputStream(resourceFile.getFile());
-                ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
-            oosToFos.writeObject(resource);
-            oosToFos.flush();
+        createResourceFileMask(resourceFile);
+        try {
             byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
             final Path path = Paths.get(resourceFile.getAbsolutePath());
             Files.write(path, bytes);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
-
+        deleteResourceFileMask(resourceFile);
         resourceCache.put(resource.getPath(), resource);
         indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(null, 0);
         //if replication enabled, send resource metadata info to remote nodes
@@ -418,6 +428,20 @@
         return resourcesStats;
     }
 
+    public void deleteCorruptedResources() throws HyracksDataException {
+        for (Path root : storageRoots) {
+            final Collection<File> metadataMaskFiles =
+                    FileUtils.listFiles(root.toFile(), METADATA_MASK_FILES_FILTER, ALL_DIR_FILTER);
+            for (File metadataMaskFile : metadataMaskFiles) {
+                final File resourceFile = new File(metadataMaskFile.getParent(), METADATA_FILE_NAME);
+                if (resourceFile.exists()) {
+                    IoUtil.delete(resourceFile);
+                }
+                IoUtil.delete(metadataMaskFile);
+            }
+        }
+    }
+
     private void deleteIndexMaskedFiles(File index) throws IOException {
         File[] masks = index.listFiles(MASK_FILES_FILTER);
         if (masks != null) {
@@ -514,6 +538,24 @@
         return null;
     }
 
+    private void createResourceFileMask(FileReference resourceFile) throws HyracksDataException {
+        Path maskFile = getResourceMaskFilePath(resourceFile);
+        try {
+            Files.createFile(maskFile);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void deleteResourceFileMask(FileReference resourceFile) throws HyracksDataException {
+        Path maskFile = getResourceMaskFilePath(resourceFile);
+        IoUtil.delete(maskFile);
+    }
+
+    private Path getResourceMaskFilePath(FileReference resourceFile) {
+        return Paths.get(resourceFile.getFile().getParentFile().getAbsolutePath(), METADATA_FILE_MASK_NAME);
+    }
+
     /**
      * Gets a component id based on its unique timestamp.
      * e.g. a component file 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439_b
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
index 03227ee..09ecb15 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -44,6 +45,16 @@
     }
 
     /**
+     * Deletes a file
+     *
+     * @param filePath the file path to be deleted
+     * @throws HyracksDataException if the file couldn't be deleted
+     */
+    public static void delete(Path filePath) throws HyracksDataException {
+        delete(filePath.toFile());
+    }
+
+    /**
      * Delete a file
      *
      * @param fileRef the file to be deleted