[ASTERIXDB-2195][REPL] Clean Masked Files
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Clean invalid masked files before promoting
a partition or sending partition files list
to master.
- Let replica calculate component id instead
of sending it from master.
- Add tests for:
- Deleting masked component.
- Deleting masked file.
Change-Id: Ib0f0159159faf87b9f5fd2eca3956dd90633bcfa
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2268
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index 155fa1d..4edae69 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -33,6 +33,7 @@
import org.apache.asterix.common.storage.ReplicaIdentifier;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class ReplicaManager implements IReplicaManager {
@@ -85,6 +86,9 @@
@Override
public synchronized void promote(int partition) throws HyracksDataException {
+ final PersistentLocalResourceRepository localResourceRepository =
+ (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+ localResourceRepository.cleanup(partition);
final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager();
recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
partitions.add(partition);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
index 128aee6..6d114c6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
@@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.Map;
+import java.util.Optional;
import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
import org.apache.asterix.app.active.ActiveNotificationHandler;
@@ -214,4 +215,13 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
}
}
+
+ public static String getIndexPath(AsterixHyracksIntegrationUtil integrationUtil, Dataset dataset, String nodeId)
+ throws Exception {
+ final FileSplit[] datasetSplits = TestDataUtil.getDatasetSplits(integrationUtil, dataset);
+ final Optional<FileSplit> nodeFileSplit =
+ Arrays.stream(datasetSplits).filter(s -> s.getNodeName().equals(nodeId)).findFirst();
+ Assert.assertTrue(nodeFileSplit.isPresent());
+ return nodeFileSplit.get().getPath();
+ }
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
new file mode 100644
index 0000000..6401d90
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.storage;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.TestDataUtil;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.io.FileReference;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PersistentLocalResourceRepositoryTest {
+
+ protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
+ private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+ integrationUtil.init(true, TEST_CONFIG_FILE_NAME);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ integrationUtil.deinit(true);
+ }
+
+ @Test
+ public void deleteMaskedFiles() throws Exception {
+ final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
+ final String nodeId = ncAppCtx.getServiceContext().getNodeId();
+ final String datasetName = "ds";
+ TestDataUtil.createIdOnlyDataset(datasetName);
+ final Dataset dataset = TestDataUtil.getDataset(integrationUtil, datasetName);
+ final String indexPath = TestDataUtil.getIndexPath(integrationUtil, dataset, nodeId);
+ FileReference indexDirRef = ncAppCtx.getIoManager().resolve(indexPath);
+ // create masked component files
+ String indexDir = indexDirRef.getFile().getAbsolutePath();
+ String componentId = "12345_12345";
+ String btree = componentId + "_b";
+ String filter = componentId + "_f";
+ Path maskPath = Paths.get(indexDir, StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentId);
+ Path btreePath = Paths.get(indexDir, btree);
+ Path filterPath = Paths.get(indexDir, filter);
+ Files.createFile(maskPath);
+ Files.createFile(btreePath);
+ Files.createFile(filterPath);
+ // clean up the dataset partition
+ PersistentLocalResourceRepository localResourceRepository =
+ (PersistentLocalResourceRepository) ncAppCtx.getLocalResourceRepository();
+ DatasetLocalResource lr = (DatasetLocalResource) localResourceRepository.get(indexPath).getResource();
+ int partition = lr.getPartition();
+ localResourceRepository.cleanup(partition);
+
+ // ensure all masked files and the mask were deleted
+ Assert.assertFalse(maskPath.toFile().exists());
+ Assert.assertFalse(btreePath.toFile().exists());
+ Assert.assertFalse(filterPath.toFile().exists());
+
+ // create single masked file
+ String fileName = "someFile";
+ maskPath = Paths.get(indexDir, StorageConstants.MASK_FILE_PREFIX + fileName);
+ Path filePath = Paths.get(indexDir, fileName);
+ Files.createFile(maskPath);
+ Files.createFile(filePath);
+ localResourceRepository.cleanup(partition);
+
+ // ensure the masked file and the mask were deleted
+ Assert.assertFalse(maskPath.toFile().exists());
+ Assert.assertFalse(filePath.toFile().exists());
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index f43f3ff..b14d70b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -79,7 +79,7 @@
final String datasetName = "ds";
TestDataUtil.createIdOnlyDataset(datasetName);
final Dataset dataset = TestDataUtil.getDataset(integrationUtil, datasetName);
- final String indexPath = getIndexPath(dataset, nodeId);
+ final String indexPath = TestDataUtil.getIndexPath(integrationUtil, dataset, nodeId);
final IDatasetLifecycleManager dclm = ncAppCtx.getDatasetLifecycleManager();
dclm.open(indexPath);
final ILSMIndex index = (ILSMIndex) dclm.get(indexPath);
@@ -185,14 +185,6 @@
interruptedLogPageSwitch();
}
- private static String getIndexPath(Dataset dataset, String nodeId) throws Exception {
- final FileSplit[] datasetSplits = TestDataUtil.getDatasetSplits(integrationUtil, dataset);
- final Optional<FileSplit> nodeFileSplit =
- Arrays.stream(datasetSplits).filter(s -> s.getNodeName().equals(nodeId)).findFirst();
- Assert.assertTrue(nodeFileSplit.isPresent());
- return nodeFileSplit.get().getPath();
- }
-
private static ITransactionContext beingTransaction(INcApplicationContext ncAppCtx, ILSMIndex index,
long resourceId) {
final TxnId txnId = new TxnId(1);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index f59914d..265c9fd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -36,6 +36,7 @@
public static final String INDEX_CHECKPOINT_FILE_PREFIX = ".idx_checkpoint_";
public static final String METADATA_FILE_NAME = ".metadata";
public static final String MASK_FILE_PREFIX = ".mask_";
+ public static final String COMPONENT_MASK_FILE_PREFIX = MASK_FILE_PREFIX + "C_";
public static final String LEGACY_DATASET_INDEX_NAME_SEPARATOR = "_idx_";
/**
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
index 26c9577..d5dc51d 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
@@ -28,9 +28,10 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
-import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
@@ -40,13 +41,10 @@
*/
public class ComponentMaskTask implements IReplicaTask {
- private static final String COMPONENT_MASK_FILE_PREFIX = StorageConstants.MASK_FILE_PREFIX + "C_";
private final String file;
- private final String componentId;
- public ComponentMaskTask(String file, String componentId) {
+ public ComponentMaskTask(String file) {
this.file = file;
- this.componentId = componentId;
}
@Override
@@ -61,11 +59,12 @@
}
}
- public static Path getComponentMaskPath(INcApplicationContext appCtx, String file) throws IOException {
+ public static Path getComponentMaskPath(INcApplicationContext appCtx, String componentFile) throws IOException {
final IIOManager ioManager = appCtx.getIoManager();
- final FileReference localPath = ioManager.resolve(file);
+ final FileReference localPath = ioManager.resolve(componentFile);
final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath());
- return Paths.get(resourceDir.toString(), COMPONENT_MASK_FILE_PREFIX + localPath.getFile().getName());
+ final String componentId = PersistentLocalResourceRepository.getComponentId(componentFile);
+ return Paths.get(resourceDir.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentId);
}
@Override
@@ -78,7 +77,6 @@
try {
final DataOutputStream dos = new DataOutputStream(out);
dos.writeUTF(file);
- dos.writeUTF(componentId);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
@@ -86,7 +84,6 @@
public static ComponentMaskTask create(DataInput input) throws IOException {
String indexFile = input.readUTF();
- String componentId = input.readUTF();
- return new ComponentMaskTask(indexFile, componentId);
+ return new ComponentMaskTask(indexFile);
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index b972f32..54d3a02 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -45,9 +45,9 @@
@Override
public void perform(INcApplicationContext appCtx, IReplicationWorker worker) throws HyracksDataException {
- //TODO delete any invalid files with masks
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+ localResourceRepository.cleanup(partition);
final List<String> partitionResources = localResourceRepository.getPartitionIndexesFiles(partition).stream()
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
final PartitionResourcesListResponse response =
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
index 74f38e2..95ae690 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
@@ -22,11 +22,9 @@
import static org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation.REPLICATE;
import java.io.IOException;
-import java.nio.file.Paths;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
-import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.replication.api.PartitionReplica;
import org.apache.asterix.replication.messaging.ComponentMaskTask;
@@ -39,7 +37,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -86,9 +83,8 @@
private void replicateComponent(PartitionReplica replica) throws IOException {
// send component header
final String anyFile = job.getAnyFile();
- final String lsmComponentID = getComponentId(anyFile);
final String indexFile = StoragePathUtil.getFileRelativePath(anyFile);
- final ComponentMaskTask maskTask = new ComponentMaskTask(indexFile, lsmComponentID);
+ final ComponentMaskTask maskTask = new ComponentMaskTask(indexFile);
ReplicationProtocol.sendTo(replica, maskTask);
ReplicationProtocol.waitForAck(replica);
// send component files
@@ -130,11 +126,4 @@
return ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
.getComponentLSN(ctx.getComponentsToBeReplicated());
}
-
- private static String getComponentId(String filePath) {
- final ResourceReference ref = ResourceReference.of(filePath);
- final String fileUniqueTimestamp =
- ref.getName().substring(0, ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER));
- return Paths.get(ref.getRelativePath().toString(), fileUniqueTimestamp).toString();
- }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 54d6268..6ffeb28 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -49,6 +49,7 @@
import org.apache.asterix.common.replication.ReplicationJob;
import org.apache.asterix.common.storage.DatasetResourceReference;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.commons.io.FileUtils;
@@ -62,8 +63,11 @@
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@@ -71,8 +75,11 @@
public class PersistentLocalResourceRepository implements ILocalResourceRepository {
public static final Predicate<Path> INDEX_COMPONENTS = path -> !path.endsWith(StorageConstants.METADATA_FILE_NAME);
+ private static final Logger LOGGER = LogManager.getLogger();
private static final FilenameFilter LSM_INDEX_FILES_FILTER =
(dir, name) -> !name.startsWith(INDEX_CHECKPOINT_FILE_PREFIX);
+ private static final FilenameFilter MASK_FILES_FILTER =
+ (dir, name) -> name.startsWith(StorageConstants.MASK_FILE_PREFIX);
private static final int MAX_CACHED_RESOURCES = 1000;
private static final IOFileFilter METADATA_FILES_FILTER = new IOFileFilter() {
@Override
@@ -349,4 +356,60 @@
}
}
}
+
+ public void cleanup(int partition) throws HyracksDataException {
+ final Set<File> partitionIndexes = getPartitionIndexes(partition);
+ // find masks
+ for (File index : partitionIndexes) {
+ File[] masks = index.listFiles(MASK_FILES_FILTER);
+ if (masks != null) {
+ try {
+ for (File mask : masks) {
+ deleteIndexMaskedFiles(index, mask);
+ // delete the mask itself
+ Files.delete(mask.toPath());
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
+ }
+
+ private void deleteIndexMaskedFiles(File index, File mask) throws IOException {
+ if (!mask.getName().startsWith(StorageConstants.MASK_FILE_PREFIX)) {
+ throw new IllegalArgumentException("Unrecognized mask file: " + mask);
+ }
+ File[] maskedFiles;
+ if (isComponentMask(mask)) {
+ final String componentId = mask.getName().substring(StorageConstants.COMPONENT_MASK_FILE_PREFIX.length());
+ maskedFiles = index.listFiles((dir, name) -> name.startsWith(componentId));
+ } else {
+ final String maskedFileName = mask.getName().substring(StorageConstants.MASK_FILE_PREFIX.length());
+ maskedFiles = index.listFiles((dir, name) -> name.equals(maskedFileName));
+ }
+ if (maskedFiles != null) {
+ for (File maskedFile : maskedFiles) {
+ LOGGER.info(() -> "deleting masked file: " + maskedFile.getAbsolutePath());
+ Files.delete(maskedFile.toPath());
+ }
+ }
+ }
+
+ /**
+ * Gets a component id based on its unique timestamp.
+ * e.g. a component file 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439_b
+ * will return a component id 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439
+ *
+ * @param componentFile any component file
+ * @return The component id
+ */
+ public static String getComponentId(String componentFile) {
+ final ResourceReference ref = ResourceReference.of(componentFile);
+ return ref.getName().substring(0, ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER));
+ }
+
+ private static boolean isComponentMask(File mask) {
+ return mask.getName().startsWith(StorageConstants.COMPONENT_MASK_FILE_PREFIX);
+ }
}