[ASTERIXDB-2234][STO] Delete Invalid Components
- user model changes: no
- storage format changes: no
- interface changes: yes
- IIndexCheckpointManager:
(+) getValidComponentTimestamp
(+) getCheckpointCount
(-) advanceLowWatermark
Details:
- Delete any component with start timestamp
after the index checkpoint valid component
timestamp on NC startup/shutdown.
- Add test case for deleting invalid components.
Change-Id: Ib11782edd79c7ef0c8949cb08e863b0ec1687a87
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2270
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 835de47..d3ef028 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.Optional;
import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IndexCheckpoint;
@@ -91,12 +92,12 @@
}
@Override
- public synchronized long getLowWatermark() throws HyracksDataException {
+ public synchronized long getLowWatermark() {
return getLatest().getLowWatermark();
}
@Override
- public synchronized boolean isFlushed(long masterLsn) throws HyracksDataException {
+ public synchronized boolean isFlushed(long masterLsn) {
if (masterLsn == BULKLOAD_LSN) {
return true;
}
@@ -104,13 +105,19 @@
}
@Override
- public synchronized void advanceLowWatermark(long lsn) throws HyracksDataException {
- flushed(getLatest().getValidComponentTimestamp(), lsn);
+ public synchronized void delete() {
+ deleteHistory(Long.MAX_VALUE, 0);
}
@Override
- public synchronized void delete() {
- deleteHistory(Long.MAX_VALUE, 0);
+ public Optional<String> getValidComponentTimestamp() {
+ final String validComponentTimestamp = getLatest().getValidComponentTimestamp();
+ return validComponentTimestamp != null ? Optional.of(validComponentTimestamp) : Optional.empty();
+ }
+
+ @Override
+ public int getCheckpointCount() {
+ return getCheckpoints().size();
}
private IndexCheckpoint getLatest() {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index e22372a..77913ca 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -18,10 +18,10 @@
*/
package org.apache.asterix.hyracks.bootstrap;
-import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import org.apache.asterix.api.http.server.ServletConstants;
import org.apache.asterix.api.http.server.StorageApiServlet;
@@ -35,16 +35,15 @@
import org.apache.asterix.common.config.MetadataProperties;
import org.apache.asterix.common.config.NodeProperties;
import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.transactions.Checkpoint;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.asterix.common.utils.PrintUtil;
import org.apache.asterix.common.utils.Servlets;
-import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.util.MetadataBuiltinFunctions;
import org.apache.asterix.utils.CompatibilityUtil;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -235,10 +234,13 @@
private void performLocalCleanUp() throws HyracksDataException {
//Delete working area files from failed jobs
runtimeContext.getIoManager().deleteWorkspaceFiles();
-
- //TODO
- //Reclaim storage for orphaned index artifacts in NCs.
- //Note: currently LSM indexes invalid components are deleted when an index is activated.
+ // Reclaim storage for orphaned index artifacts in NCs.
+ final Set<Integer> nodePartitions = runtimeContext.getReplicaManager().getPartitions();
+ final PersistentLocalResourceRepository localResourceRepository =
+ (PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository();
+ for (Integer partition : nodePartitions) {
+ localResourceRepository.cleanup(partition);
+ }
}
private void updateOnNodeJoin() {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
index 6401d90..fb1adde 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
@@ -18,19 +18,33 @@
*/
package org.apache.asterix.test.storage;
+import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT;
+
+import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
import org.apache.asterix.common.TestDataUtil;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.common.LocalResource;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -96,4 +110,60 @@
Assert.assertFalse(maskPath.toFile().exists());
Assert.assertFalse(filePath.toFile().exists());
}
+
+ @Test
+ public void deleteInvalidComponents() throws Exception {
+ final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
+ final String nodeId = ncAppCtx.getServiceContext().getNodeId();
+ final String datasetName = "ds";
+ TestDataUtil.createIdOnlyDataset(datasetName);
+ final Dataset dataset = TestDataUtil.getDataset(integrationUtil, datasetName);
+ final String indexPath = TestDataUtil.getIndexPath(integrationUtil, dataset, nodeId);
+ PersistentLocalResourceRepository localResourceRepository =
+ (PersistentLocalResourceRepository) ncAppCtx.getLocalResourceRepository();
+ DatasetLocalResource lr = (DatasetLocalResource) localResourceRepository.get(indexPath).getResource();
+ // ensure cleaning index without any components will not have any impact
+ localResourceRepository.cleanup(lr.getPartition());
+
+ // generate disk component (insert + flush)
+ TestDataUtil.upsertData(datasetName, 100);
+ ncAppCtx.getDatasetLifecycleManager().flushDataset(dataset.getDatasetId(), false);
+
+ // create new invalid component with a timestamp > checkpoint valid component timestamp (i.e. in the future)
+ Format formatter = new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT);
+ Date futureTime = new Date(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(5));
+ String invalidComponentTimestamp =
+ formatter.format(futureTime) + AbstractLSMIndexFileManager.DELIMITER + formatter.format(futureTime);
+ FileReference indexDirRef = ncAppCtx.getIoManager().resolve(indexPath);
+ String indexDir = indexDirRef.getFile().getAbsolutePath();
+ // create the invalid component files
+ Path btreePath = Paths.get(indexDir, invalidComponentTimestamp + AbstractLSMIndexFileManager.DELIMITER
+ + AbstractLSMIndexFileManager.BTREE_SUFFIX);
+ Path filterPath = Paths.get(indexDir, invalidComponentTimestamp + AbstractLSMIndexFileManager.DELIMITER
+ + AbstractLSMIndexFileManager.BLOOM_FILTER_SUFFIX);
+ Files.createFile(btreePath);
+ Files.createFile(filterPath);
+
+ // clean up the index partition
+ localResourceRepository.cleanup(lr.getPartition());
+ // ensure that the invalid component was deleted
+ Assert.assertFalse(btreePath.toFile().exists());
+ Assert.assertFalse(filterPath.toFile().exists());
+
+ // ensure that valid components still exist
+ // find index valid component timestamp from checkpoint
+ LocalResource localResource = localResourceRepository.get(indexPath);
+ DatasetResourceReference drr = DatasetResourceReference.of(localResource);
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider = ncAppCtx.getIndexCheckpointManagerProvider();
+ IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(drr);
+ Optional<String> validComponentTimestamp = indexCheckpointManager.getValidComponentTimestamp();
+ Assert.assertTrue(validComponentTimestamp.isPresent());
+
+ File[] indexRemainingFiles =
+ indexDirRef.getFile().listFiles(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
+ Assert.assertNotNull(indexRemainingFiles);
+ long validComponentFilesCount = Arrays.stream(indexRemainingFiles)
+ .filter(file -> file.getName().startsWith(validComponentTimestamp.get())).count();
+ Assert.assertTrue(validComponentFilesCount > 0);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index afa3823..b008f11 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.common.storage;
+import java.util.Optional;
+
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IIndexCheckpointManager {
@@ -79,15 +81,21 @@
boolean isFlushed(long masterLsn) throws HyracksDataException;
/**
- * Advance the index low watermark to {@code lsn}
- *
- * @param lsn
- * @throws HyracksDataException
- */
- void advanceLowWatermark(long lsn) throws HyracksDataException;
-
- /**
* Deletes all checkpoints
*/
void delete();
+
+ /**
+ * Gets the index last valid component timestamp if the index has any components. Otherwise {@link Optional#empty()}
+ *
+ * @return the index last valid component timestamp
+ */
+ Optional<String> getValidComponentTimestamp();
+
+ /**
+ * Gets the number of valid checkpoints the index has.
+ *
+ * @return the number of valid checkpoints
+ */
+ int getCheckpointCount();
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 6ffeb28..71f6243 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -20,6 +20,8 @@
import static org.apache.asterix.common.utils.StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX;
import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
+import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER;
+import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT;
import java.io.File;
import java.io.FileInputStream;
@@ -31,8 +33,12 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.text.Format;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -48,6 +54,7 @@
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.ReplicationJob;
import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.utils.StorageConstants;
@@ -105,6 +112,9 @@
}
};
+ private static final ThreadLocal<SimpleDateFormat> THREAD_LOCAL_FORMATTER =
+ ThreadLocal.withInitial(() -> new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT));
+
// Finals
private final IIOManager ioManager;
private final Cache<String, LocalResource> resourceCache;
@@ -359,23 +369,66 @@
public void cleanup(int partition) throws HyracksDataException {
final Set<File> partitionIndexes = getPartitionIndexes(partition);
- // find masks
- for (File index : partitionIndexes) {
- File[] masks = index.listFiles(MASK_FILES_FILTER);
- if (masks != null) {
- try {
- for (File mask : masks) {
- deleteIndexMaskedFiles(index, mask);
- // delete the mask itself
- Files.delete(mask.toPath());
- }
- } catch (IOException e) {
- throw HyracksDataException.create(e);
+ try {
+ for (File index : partitionIndexes) {
+ deleteIndexMaskedFiles(index);
+ if (isValidIndex(index)) {
+ deleteIndexInvalidComponents(index);
+ }
+ }
+ } catch (IOException | ParseException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private void deleteIndexMaskedFiles(File index) throws IOException {
+ File[] masks = index.listFiles(MASK_FILES_FILTER);
+ if (masks != null) {
+ for (File mask : masks) {
+ deleteIndexMaskedFiles(index, mask);
+ // delete the mask itself
+ Files.delete(mask.toPath());
+ }
+ }
+ }
+
+ private boolean isValidIndex(File index) throws IOException {
+ // any index without any checkpoint files is invalid
+ // this can happen if a crash happens when the index metadata file is created
+ // but before the initial checkpoint is persisted. The index metadata file will
+ // be deleted and recreated when the index is created again
+ return getIndexCheckpointManager(index).getCheckpointCount() != 0;
+ }
+
+ private void deleteIndexInvalidComponents(File index) throws IOException, ParseException {
+ final Optional<String> validComponentTimestamp = getIndexCheckpointManager(index).getValidComponentTimestamp();
+ if (!validComponentTimestamp.isPresent()) {
+ // index doesn't have any components
+ return;
+ }
+ final Format formatter = THREAD_LOCAL_FORMATTER.get();
+ final Date validTimestamp = (Date) formatter.parseObject(validComponentTimestamp.get());
+ final File[] indexComponentFiles = index.listFiles(COMPONENT_FILES_FILTER);
+ if (indexComponentFiles != null) {
+ for (File componentFile : indexComponentFiles) {
+ // delete any file with startTime > validTimestamp
+ final String fileStartTimeStr =
+ AbstractLSMIndexFileManager.getComponentStartTime(componentFile.getName());
+ final Date fileStartTime = (Date) formatter.parseObject(fileStartTimeStr);
+ if (fileStartTime.after(validTimestamp)) {
+ LOGGER.info(() -> "Deleting invalid component file: " + componentFile.getAbsolutePath());
+ Files.delete(componentFile.toPath());
}
}
}
}
+ private IIndexCheckpointManager getIndexCheckpointManager(File index) throws HyracksDataException {
+ final String indexFile = Paths.get(index.getAbsolutePath(), StorageConstants.METADATA_FILE_NAME).toString();
+ final ResourceReference indexRef = ResourceReference.of(indexFile);
+ return indexCheckpointManagerProvider.get(indexRef);
+ }
+
private void deleteIndexMaskedFiles(File index, File mask) throws IOException {
if (!mask.getName().startsWith(StorageConstants.MASK_FILE_PREFIX)) {
throw new IllegalArgumentException("Unrecognized mask file: " + mask);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index a6ceba8..59a919b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -77,7 +77,9 @@
*/
public static final String TXN_PREFIX = ".T";
- protected static final FilenameFilter fileNameFilter = (dir, name) -> !name.startsWith(".");
+ public static final String COMPONENT_TIMESTAMP_FORMAT = "yyyy-MM-dd-HH-mm-ss-SSS";
+
+ public static final FilenameFilter COMPONENT_FILES_FILTER = (dir, name) -> !name.startsWith(".");
protected static final FilenameFilter txnFileNameFilter = (dir, name) -> name.startsWith(TXN_PREFIX);
protected static FilenameFilter bloomFilterFilter =
(dir, name) -> !name.startsWith(".") && name.endsWith(BLOOM_FILTER_SUFFIX);
@@ -87,7 +89,7 @@
protected final IIOManager ioManager;
// baseDir should reflect dataset name and partition name and be absolute
protected final FileReference baseDir;
- protected final Format formatter = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS");
+ protected final Format formatter = new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT);
protected final Comparator<ComparableFileName> recencyCmp = new RecencyComparator();
protected final TreeIndexFactory<? extends ITreeIndex> treeFactory;
private String prevTimestamp = null;
@@ -222,7 +224,7 @@
// (1) The isValid flag is not set
// (2) The file's interval is contained by some other file
// Here, we only filter out (1).
- cleanupAndGetValidFilesInternal(fileNameFilter, treeFactory, allFiles);
+ cleanupAndGetValidFilesInternal(COMPONENT_FILES_FILTER, treeFactory, allFiles);
if (allFiles.isEmpty()) {
return validFiles;
@@ -412,6 +414,10 @@
return ts;
}
+ public static String getComponentStartTime(String fileName) {
+ return fileName.split(DELIMITER)[0];
+ }
+
public static String getComponentEndTime(String fileName) {
return fileName.split(DELIMITER)[1];
}