[ASTERIXDB-2234][STO] Delete Invalid Components

- user model changes: no
- storage format changes: no
- interface changes: yes
  - IIndexCheckpointManager:
    (+) getValidComponentTimestamp
    (+) getCheckpointCount
    (-) advanceLowWatermark

Details:
- Delete any component with start timestamp
  after the index checkpoint valid component
  timestamp on NC startup/shutdown.
- Add test case for deleting invalid components.

Change-Id: Ib11782edd79c7ef0c8949cb08e863b0ec1687a87
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2270
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 835de47..d3ef028 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -28,6 +28,7 @@
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.storage.IndexCheckpoint;
@@ -91,12 +92,12 @@
     }
 
     @Override
-    public synchronized long getLowWatermark() throws HyracksDataException {
+    public synchronized long getLowWatermark() {
         return getLatest().getLowWatermark();
     }
 
     @Override
-    public synchronized boolean isFlushed(long masterLsn) throws HyracksDataException {
+    public synchronized boolean isFlushed(long masterLsn) {
         if (masterLsn == BULKLOAD_LSN) {
             return true;
         }
@@ -104,13 +105,19 @@
     }
 
     @Override
-    public synchronized void advanceLowWatermark(long lsn) throws HyracksDataException {
-        flushed(getLatest().getValidComponentTimestamp(), lsn);
+    public synchronized void delete() {
+        deleteHistory(Long.MAX_VALUE, 0);
     }
 
     @Override
-    public synchronized void delete() {
-        deleteHistory(Long.MAX_VALUE, 0);
+    public Optional<String> getValidComponentTimestamp() {
+        final String validComponentTimestamp = getLatest().getValidComponentTimestamp();
+        return validComponentTimestamp != null ? Optional.of(validComponentTimestamp) : Optional.empty();
+    }
+
+    @Override
+    public int getCheckpointCount() {
+        return getCheckpoints().size();
     }
 
     private IndexCheckpoint getLatest() {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index e22372a..77913ca 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -18,10 +18,10 @@
  */
 package org.apache.asterix.hyracks.bootstrap;
 
-import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.asterix.api.http.server.ServletConstants;
 import org.apache.asterix.api.http.server.StorageApiServlet;
@@ -35,16 +35,15 @@
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.config.NodeProperties;
 import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.utils.PrintUtil;
 import org.apache.asterix.common.utils.Servlets;
-import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
 import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.util.MetadataBuiltinFunctions;
 import org.apache.asterix.utils.CompatibilityUtil;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -235,10 +234,13 @@
     private void performLocalCleanUp() throws HyracksDataException {
         //Delete working area files from failed jobs
         runtimeContext.getIoManager().deleteWorkspaceFiles();
-
-        //TODO
-        //Reclaim storage for orphaned index artifacts in NCs.
-        //Note: currently LSM indexes invalid components are deleted when an index is activated.
+        // Reclaim storage for orphaned index artifacts in NCs.
+        final Set<Integer> nodePartitions = runtimeContext.getReplicaManager().getPartitions();
+        final PersistentLocalResourceRepository localResourceRepository =
+                (PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository();
+        for (Integer partition : nodePartitions) {
+            localResourceRepository.cleanup(partition);
+        }
     }
 
     private void updateOnNodeJoin() {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
index 6401d90..fb1adde 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
@@ -18,19 +18,33 @@
  */
 package org.apache.asterix.test.storage;
 
+import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT;
+
+import java.io.File;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 import org.apache.asterix.common.TestDataUtil;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.common.LocalResource;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -96,4 +110,60 @@
         Assert.assertFalse(maskPath.toFile().exists());
         Assert.assertFalse(filePath.toFile().exists());
     }
+
+    @Test
+    public void deleteInvalidComponents() throws Exception {
+        final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
+        final String nodeId = ncAppCtx.getServiceContext().getNodeId();
+        final String datasetName = "ds";
+        TestDataUtil.createIdOnlyDataset(datasetName);
+        final Dataset dataset = TestDataUtil.getDataset(integrationUtil, datasetName);
+        final String indexPath = TestDataUtil.getIndexPath(integrationUtil, dataset, nodeId);
+        PersistentLocalResourceRepository localResourceRepository =
+                (PersistentLocalResourceRepository) ncAppCtx.getLocalResourceRepository();
+        DatasetLocalResource lr = (DatasetLocalResource) localResourceRepository.get(indexPath).getResource();
+        // ensure cleaning index without any components will not have any impact
+        localResourceRepository.cleanup(lr.getPartition());
+
+        // generate disk component (insert + flush)
+        TestDataUtil.upsertData(datasetName, 100);
+        ncAppCtx.getDatasetLifecycleManager().flushDataset(dataset.getDatasetId(), false);
+
+        // create new invalid component with a timestamp > checkpoint valid component timestamp (i.e. in the future)
+        Format formatter = new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT);
+        Date futureTime = new Date(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(5));
+        String invalidComponentTimestamp =
+                formatter.format(futureTime) + AbstractLSMIndexFileManager.DELIMITER + formatter.format(futureTime);
+        FileReference indexDirRef = ncAppCtx.getIoManager().resolve(indexPath);
+        String indexDir = indexDirRef.getFile().getAbsolutePath();
+        // create the invalid component files
+        Path btreePath = Paths.get(indexDir, invalidComponentTimestamp + AbstractLSMIndexFileManager.DELIMITER
+                + AbstractLSMIndexFileManager.BTREE_SUFFIX);
+        Path filterPath = Paths.get(indexDir, invalidComponentTimestamp + AbstractLSMIndexFileManager.DELIMITER
+                + AbstractLSMIndexFileManager.BLOOM_FILTER_SUFFIX);
+        Files.createFile(btreePath);
+        Files.createFile(filterPath);
+
+        // clean up the index partition
+        localResourceRepository.cleanup(lr.getPartition());
+        // ensure that the invalid component was deleted
+        Assert.assertFalse(btreePath.toFile().exists());
+        Assert.assertFalse(filterPath.toFile().exists());
+
+        // ensure that valid components still exist
+        // find index valid component timestamp from checkpoint
+        LocalResource localResource = localResourceRepository.get(indexPath);
+        DatasetResourceReference drr = DatasetResourceReference.of(localResource);
+        IIndexCheckpointManagerProvider indexCheckpointManagerProvider = ncAppCtx.getIndexCheckpointManagerProvider();
+        IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(drr);
+        Optional<String> validComponentTimestamp = indexCheckpointManager.getValidComponentTimestamp();
+        Assert.assertTrue(validComponentTimestamp.isPresent());
+
+        File[] indexRemainingFiles =
+                indexDirRef.getFile().listFiles(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
+        Assert.assertNotNull(indexRemainingFiles);
+        long validComponentFilesCount = Arrays.stream(indexRemainingFiles)
+                .filter(file -> file.getName().startsWith(validComponentTimestamp.get())).count();
+        Assert.assertTrue(validComponentFilesCount > 0);
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index afa3823..b008f11 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.common.storage;
 
+import java.util.Optional;
+
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IIndexCheckpointManager {
@@ -79,15 +81,21 @@
     boolean isFlushed(long masterLsn) throws HyracksDataException;
 
     /**
-     * Advance the index low watermark to {@code lsn}
-     *
-     * @param lsn
-     * @throws HyracksDataException
-     */
-    void advanceLowWatermark(long lsn) throws HyracksDataException;
-
-    /**
      * Deletes all checkpoints
      */
     void delete();
+
+    /**
+     * Gets the index last valid component timestamp if the index has any components. Otherwise {@link Optional#empty()}
+     *
+     * @return the index last valid component timestamp
+     */
+    Optional<String> getValidComponentTimestamp();
+
+    /**
+     * Gets the number of valid checkpoints the index has.
+     *
+     * @return the number of valid checkpoints
+     */
+    int getCheckpointCount();
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 6ffeb28..71f6243 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -20,6 +20,8 @@
 
 import static org.apache.asterix.common.utils.StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX;
 import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
+import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER;
+import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -31,8 +33,12 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.text.Format;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -48,6 +54,7 @@
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.ReplicationJob;
 import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.utils.StorageConstants;
@@ -105,6 +112,9 @@
         }
     };
 
+    private static final ThreadLocal<SimpleDateFormat> THREAD_LOCAL_FORMATTER =
+            ThreadLocal.withInitial(() -> new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT));
+
     // Finals
     private final IIOManager ioManager;
     private final Cache<String, LocalResource> resourceCache;
@@ -359,23 +369,66 @@
 
     public void cleanup(int partition) throws HyracksDataException {
         final Set<File> partitionIndexes = getPartitionIndexes(partition);
-        // find masks
-        for (File index : partitionIndexes) {
-            File[] masks = index.listFiles(MASK_FILES_FILTER);
-            if (masks != null) {
-                try {
-                    for (File mask : masks) {
-                        deleteIndexMaskedFiles(index, mask);
-                        // delete the mask itself
-                        Files.delete(mask.toPath());
-                    }
-                } catch (IOException e) {
-                    throw HyracksDataException.create(e);
+        try {
+            for (File index : partitionIndexes) {
+                deleteIndexMaskedFiles(index);
+                if (isValidIndex(index)) {
+                    deleteIndexInvalidComponents(index);
+                }
+            }
+        } catch (IOException | ParseException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void deleteIndexMaskedFiles(File index) throws IOException {
+        File[] masks = index.listFiles(MASK_FILES_FILTER);
+        if (masks != null) {
+            for (File mask : masks) {
+                deleteIndexMaskedFiles(index, mask);
+                // delete the mask itself
+                Files.delete(mask.toPath());
+            }
+        }
+    }
+
+    private boolean isValidIndex(File index) throws IOException {
+        // any index without any checkpoint files is invalid
+        // this can happen if a crash happens when the index metadata file is created
+        // but before the initial checkpoint is persisted. The index metadata file will
+        // be deleted and recreated when the index is created again
+        return getIndexCheckpointManager(index).getCheckpointCount() != 0;
+    }
+
+    private void deleteIndexInvalidComponents(File index) throws IOException, ParseException {
+        final Optional<String> validComponentTimestamp = getIndexCheckpointManager(index).getValidComponentTimestamp();
+        if (!validComponentTimestamp.isPresent()) {
+            // index doesn't have any components
+            return;
+        }
+        final Format formatter = THREAD_LOCAL_FORMATTER.get();
+        final Date validTimestamp = (Date) formatter.parseObject(validComponentTimestamp.get());
+        final File[] indexComponentFiles = index.listFiles(COMPONENT_FILES_FILTER);
+        if (indexComponentFiles != null) {
+            for (File componentFile : indexComponentFiles) {
+                // delete any file with startTime > validTimestamp
+                final String fileStartTimeStr =
+                        AbstractLSMIndexFileManager.getComponentStartTime(componentFile.getName());
+                final Date fileStartTime = (Date) formatter.parseObject(fileStartTimeStr);
+                if (fileStartTime.after(validTimestamp)) {
+                    LOGGER.info(() -> "Deleting invalid component file: " + componentFile.getAbsolutePath());
+                    Files.delete(componentFile.toPath());
                 }
             }
         }
     }
 
+    private IIndexCheckpointManager getIndexCheckpointManager(File index) throws HyracksDataException {
+        final String indexFile = Paths.get(index.getAbsolutePath(), StorageConstants.METADATA_FILE_NAME).toString();
+        final ResourceReference indexRef = ResourceReference.of(indexFile);
+        return indexCheckpointManagerProvider.get(indexRef);
+    }
+
     private void deleteIndexMaskedFiles(File index, File mask) throws IOException {
         if (!mask.getName().startsWith(StorageConstants.MASK_FILE_PREFIX)) {
             throw new IllegalArgumentException("Unrecognized mask file: " + mask);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index a6ceba8..59a919b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -77,7 +77,9 @@
      */
     public static final String TXN_PREFIX = ".T";
 
-    protected static final FilenameFilter fileNameFilter = (dir, name) -> !name.startsWith(".");
+    public static final String COMPONENT_TIMESTAMP_FORMAT = "yyyy-MM-dd-HH-mm-ss-SSS";
+
+    public static final FilenameFilter COMPONENT_FILES_FILTER = (dir, name) -> !name.startsWith(".");
     protected static final FilenameFilter txnFileNameFilter = (dir, name) -> name.startsWith(TXN_PREFIX);
     protected static FilenameFilter bloomFilterFilter =
             (dir, name) -> !name.startsWith(".") && name.endsWith(BLOOM_FILTER_SUFFIX);
@@ -87,7 +89,7 @@
     protected final IIOManager ioManager;
     // baseDir should reflect dataset name and partition name and be absolute
     protected final FileReference baseDir;
-    protected final Format formatter = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS");
+    protected final Format formatter = new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT);
     protected final Comparator<ComparableFileName> recencyCmp = new RecencyComparator();
     protected final TreeIndexFactory<? extends ITreeIndex> treeFactory;
     private String prevTimestamp = null;
@@ -222,7 +224,7 @@
         // (1) The isValid flag is not set
         // (2) The file's interval is contained by some other file
         // Here, we only filter out (1).
-        cleanupAndGetValidFilesInternal(fileNameFilter, treeFactory, allFiles);
+        cleanupAndGetValidFilesInternal(COMPONENT_FILES_FILTER, treeFactory, allFiles);
 
         if (allFiles.isEmpty()) {
             return validFiles;
@@ -412,6 +414,10 @@
         return ts;
     }
 
+    public static String getComponentStartTime(String fileName) {
+        return fileName.split(DELIMITER)[0];
+    }
+
     public static String getComponentEndTime(String fileName) {
         return fileName.split(DELIMITER)[1];
     }