[NO ISSUE][STO] Limit flushes to impacted partitions
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- When requesting a flush, limit the indexes to be flushed
to the impacted partitions.
- Invalidate cached resources on replica promotion.
- Invalidate cached resources on resource file deletion.
Change-Id: I4c1408627c8e11240c3575c4b8f190d746588867
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15683
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 6736642..2ca3fbc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -535,7 +535,7 @@
}
replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN, false);
if (flush) {
- appCtx.getDatasetLifecycleManager().flushAllDatasets();
+ appCtx.getDatasetLifecycleManager().flushAllDatasets(partitions::contains);
}
cleanUp(partitions);
} catch (IOException | ACIDException e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index a159e09..1372016 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -130,6 +130,7 @@
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
localResourceRepository.cleanup(partition);
+ localResourceRepository.clearResourcesCache();
final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager();
recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
partitions.put(partition, new Object());
@@ -169,8 +170,7 @@
public void closePartitionResources(int partition) throws HyracksDataException {
final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
- //TODO(mhubail) we can flush only datasets of the requested partition
- datasetLifecycleManager.flushAllDatasets();
+ datasetLifecycleManager.flushAllDatasets(p -> p == partition);
final PersistentLocalResourceRepository resourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
final Map<Long, LocalResource> partitionResources = resourceRepository.getPartitionResources(partition);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 44b83d4..1c2a047 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Set;
+import java.util.function.IntPredicate;
import java.util.function.Predicate;
import org.apache.asterix.common.context.DatasetInfo;
@@ -61,6 +62,14 @@
void flushAllDatasets() throws HyracksDataException;
/**
+ * Flushes all open datasets synchronously for partitions {@code partitions}
+ *
+ * @param partitions
+ * @throws HyracksDataException
+ */
+ void flushAllDatasets(IntPredicate partitions) throws HyracksDataException;
+
+ /**
* Schedules asynchronous flush on indexes matching the predicate {@code indexPredicate}
*
* @param indexPredicate
@@ -143,19 +152,13 @@
List<IndexInfo> getOpenIndexesInfo();
/**
- * Flushes and closes all user datasets (non-metadata datasets)
- *
- * @throws HyracksDataException
- */
- void closeUserDatasets() throws HyracksDataException;
-
- /**
* Flushes all opened datasets that are matching {@code replicationStrategy}.
*
* @param replicationStrategy
+ * @param partitions
* @throws HyracksDataException
*/
- void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException;
+ void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions) throws HyracksDataException;
/**
* Waits for all ongoing IO operations on all open datasets that are matching {@code replicationStrategy}.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index c431dca..117b4fc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.IntPredicate;
import java.util.function.Predicate;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
@@ -362,9 +363,14 @@
@Override
public synchronized void flushAllDatasets() throws HyracksDataException {
+ flushAllDatasets(partition -> true);
+ }
+
+ @Override
+ public synchronized void flushAllDatasets(IntPredicate partitions) throws HyracksDataException {
for (DatasetResource dsr : datasets.values()) {
if (dsr.getDatasetInfo().isOpen()) {
- flushDatasetOpenIndexes(dsr, false);
+ flushDatasetOpenIndexes(dsr, partitions, false);
}
}
}
@@ -373,7 +379,7 @@
public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
DatasetResource dsr = datasets.get(datasetId);
if (dsr != null) {
- flushDatasetOpenIndexes(dsr, asyncFlush);
+ flushDatasetOpenIndexes(dsr, p -> true, asyncFlush);
}
}
@@ -407,7 +413,8 @@
* This method can only be called asynchronously safely if we're sure no modify operation
* will take place until the flush is scheduled
*/
- private void flushDatasetOpenIndexes(DatasetResource dsr, boolean asyncFlush) throws HyracksDataException {
+ private void flushDatasetOpenIndexes(DatasetResource dsr, IntPredicate partitions, boolean asyncFlush)
+ throws HyracksDataException {
DatasetInfo dsInfo = dsr.getDatasetInfo();
if (!dsInfo.isOpen()) {
throw new IllegalStateException("flushDatasetOpenIndexes is called on a dataset that is closed");
@@ -419,6 +426,9 @@
// ensure all in-flight flushes gets scheduled
logManager.log(waitLog);
for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
+ if (!partitions.test(primaryOpTracker.getPartition())) {
+ continue;
+ }
// flush each partition one by one
int numActiveOperations = primaryOpTracker.getNumActiveOperations();
if (numActiveOperations > 0) {
@@ -433,6 +443,9 @@
if (!asyncFlush) {
List<FlushOperation> flushes = new ArrayList<>();
for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
+ if (!partitions.test(primaryOpTracker.getPartition())) {
+ continue;
+ }
flushes.addAll(primaryOpTracker.getScheduledFlushes());
}
LSMIndexUtil.waitFor(flushes);
@@ -443,7 +456,7 @@
// First wait for any ongoing IO operations
DatasetInfo dsInfo = dsr.getDatasetInfo();
try {
- flushDatasetOpenIndexes(dsr, false);
+ flushDatasetOpenIndexes(dsr, p -> true, false);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
@@ -480,16 +493,6 @@
}
@Override
- public synchronized void closeUserDatasets() throws HyracksDataException {
- ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
- for (DatasetResource dsr : openDatasets) {
- if (!dsr.isMetadataDataset()) {
- closeDataset(dsr);
- }
- }
- }
-
- @Override
public synchronized void stop(boolean dumpState, OutputStream outputStream) throws IOException {
if (stopped) {
return;
@@ -539,10 +542,11 @@
}
@Override
- public void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException {
+ public void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions)
+ throws HyracksDataException {
for (DatasetResource dsr : datasets.values()) {
if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
- flushDatasetOpenIndexes(dsr, false);
+ flushDatasetOpenIndexes(dsr, partitions, false);
}
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
index 5eef84e..1e93228 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
@@ -27,8 +27,10 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.replication.api.IReplicaTask;
import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.logging.log4j.LogManager;
@@ -53,6 +55,11 @@
final File localFile = ioManager.resolve(file).getFile();
if (localFile.exists()) {
Files.delete(localFile.toPath());
+ ResourceReference replicaRes = ResourceReference.of(localFile.getAbsolutePath());
+ if (replicaRes.isMetadataResource()) {
+ ((PersistentLocalResourceRepository) appCtx.getLocalResourceRepository())
+ .invalidateResource(replicaRes.getRelativePath().toString());
+ }
LOGGER.info(() -> "Deleted file: " + localFile.getAbsolutePath());
} else {
LOGGER.warn(() -> "Requested to delete a non-existing file: " + localFile.getAbsolutePath());
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index a8eee4f..0d0ef19 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -65,7 +65,8 @@
final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica, deltaRecovery);
// flush replicated dataset to generate disk component for any remaining in-memory components
final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
- appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
+ appCtx.getDatasetLifecycleManager().flushDataset(replStrategy,
+ p -> p == replica.getIdentifier().getPartition());
waitForReplicatedDatasetsIO();
fileSync.sync();
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index cc396ad..27e753f 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -300,6 +300,10 @@
resourceCache.invalidate(relativePath);
}
+ public void clearResourcesCache() {
+ resourceCache.invalidateAll();
+ }
+
private static String getFileName(String path) {
return path.endsWith(File.separator) ? (path + StorageConstants.METADATA_FILE_NAME)
: (path + File.separator + StorageConstants.METADATA_FILE_NAME);