[ASTERIXDB-3220][REPL] Allow waiting for IO on specific dataset partition
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Add API to wait for IO on a specific dataset partition.
- When waiting for a partition replica IO ops to finish, only wait
for the replica partition rather than all partitions.
Change-Id: I90f311f602b3c8526556f64d7b25672981fac320
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17633
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 1c2a047..c7eee21 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -161,12 +161,14 @@
void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions) throws HyracksDataException;
/**
- * Waits for all ongoing IO operations on all open datasets that are matching {@code replicationStrategy}.
+ * Waits for all ongoing IO operations on all open datasets that are matching {@code replicationStrategy} and
+ * {@code partition}.
*
* @param replicationStrategy
+ * @param partition
* @throws HyracksDataException
*/
- void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException;
+ void waitForIO(IReplicationStrategy replicationStrategy, int partition) throws HyracksDataException;
/**
* @return the current datasets io stats
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 5964bb4..7d3dba4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -33,17 +33,19 @@
private static final Logger LOGGER = LogManager.getLogger();
protected final int datasetID;
protected final DatasetInfo dsInfo;
+ protected final int partition;
- public BaseOperationTracker(int datasetID, DatasetInfo dsInfo) {
+ public BaseOperationTracker(int datasetID, DatasetInfo dsInfo, int partition) {
this.datasetID = datasetID;
this.dsInfo = dsInfo;
+ this.partition = partition;
}
@Override
public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.REPLICATE) {
- dsInfo.declareActiveIOOperation(REPLICATE);
+ dsInfo.declareActiveIOOperation(REPLICATE, partition);
}
}
@@ -59,7 +61,7 @@
public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.REPLICATE) {
- dsInfo.undeclareActiveIOOperation(REPLICATE);
+ dsInfo.undeclareActiveIOOperation(REPLICATE, partition);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index d15d9be..87a3c2f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -33,12 +33,16 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
private static final Logger LOGGER = LogManager.getLogger();
// partition -> index
private final Map<Integer, Set<IndexInfo>> partitionIndexes;
// resourceID -> index
private final Map<Long, IndexInfo> indexes;
+ private final Int2IntMap partitionPendingIO;
private final int datasetID;
private final ILogManager logManager;
private final LogRecord waitLog = new LogRecord();
@@ -54,6 +58,7 @@
public DatasetInfo(int datasetID, ILogManager logManager) {
this.partitionIndexes = new HashMap<>();
this.indexes = new HashMap<>();
+ this.partitionPendingIO = new Int2IntOpenHashMap();
this.setLastAccess(-1);
this.datasetID = datasetID;
this.setRegistered(false);
@@ -74,7 +79,8 @@
setLastAccess(System.currentTimeMillis());
}
- public synchronized void declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
+ public synchronized void declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int partition) {
+ partitionPendingIO.put(partition, partitionPendingIO.getOrDefault(partition, 0) + 1);
numActiveIOOps++;
switch (opType) {
case FLUSH:
@@ -91,7 +97,8 @@
}
}
- public synchronized void undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
+ public synchronized void undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int partition) {
+ partitionPendingIO.put(partition, partitionPendingIO.getOrDefault(partition, 0) - 1);
numActiveIOOps--;
switch (opType) {
case FLUSH:
@@ -253,6 +260,26 @@
}
}
+ public void waitForIO(int partition) throws HyracksDataException {
+ logManager.log(waitLog);
+ synchronized (this) {
+ while (partitionPendingIO.getOrDefault(partition, 0) > 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ }
+ }
+ if (partitionPendingIO.getOrDefault(partition, 0) < 0) {
+ LOGGER.error("number of IO operations cannot be negative for dataset {}, partition {}", this,
+ partition);
+ throw new IllegalStateException(
+ "Number of IO operations cannot be negative: " + this + ", partition " + partition);
+ }
+ }
+ }
+
public synchronized int getPendingFlushes() {
return pendingFlushes;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 117b4fc..4fc9dd6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -552,10 +552,10 @@
}
@Override
- public void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException {
+ public void waitForIO(IReplicationStrategy replicationStrategy, int partition) throws HyracksDataException {
for (DatasetResource dsr : datasets.values()) {
if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
- dsr.getDatasetInfo().waitForIO();
+ dsr.getDatasetInfo().waitForIO(partition);
}
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index b0d8e02..a4ad7cf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -59,7 +59,6 @@
@NotThreadSafe
public class PrimaryIndexOperationTracker extends BaseOperationTracker implements IoOperationCompleteListener {
private static final Logger LOGGER = LogManager.getLogger();
- private final int partition;
// Number of active operations on an ILSMIndex instance.
private final AtomicInteger numActiveOperations;
private final ILogManager logManager;
@@ -71,8 +70,7 @@
public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
ILSMComponentIdGenerator idGenerator) {
- super(datasetID, dsInfo);
- this.partition = partition;
+ super(datasetID, dsInfo, partition);
this.logManager = logManager;
this.numActiveOperations = new AtomicInteger();
this.idGenerator = idGenerator;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 1189b51..f56e5c0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -74,6 +74,7 @@
private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
protected final DatasetInfo dsInfo;
protected final ILSMIndex lsmIndex;
+ private final int partition;
private long firstLsnForCurrentMemoryComponent = 0L;
private long persistenceLsn = 0L;
private int pendingFlushes = 0;
@@ -84,6 +85,7 @@
this.dsInfo = dsInfo;
this.lsmIndex = lsmIndex;
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
+ this.partition = ResourceReference.ofIndex(lsmIndex.getIndexIdentifier()).getPartitionNum();
componentIds.add(componentId);
}
@@ -259,7 +261,7 @@
@Override
public synchronized void scheduled(ILSMIOOperation operation) throws HyracksDataException {
- dsInfo.declareActiveIOOperation(operation.getIOOpertionType());
+ dsInfo.declareActiveIOOperation(operation.getIOOpertionType(), partition);
if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
pendingFlushes++;
FlushOperation flush = (FlushOperation) operation;
@@ -282,7 +284,7 @@
pendingFlushes == 0 ? firstLsnForCurrentMemoryComponent : (Long) map.get(KEY_FLUSH_LOG_LSN);
}
}
- dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType());
+ dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType(), partition);
}
public synchronized boolean hasPendingFlush() {
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
index db0911b..33d513f 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
@@ -77,6 +77,7 @@
String indexId = "mockIndexId";
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
+ Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath());
DatasetInfo dsInfo = new DatasetInfo(101, null);
LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
@@ -140,6 +141,7 @@
ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
+ Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath());
ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
@@ -161,6 +163,7 @@
ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
+ Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath());
ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
@@ -221,4 +224,8 @@
Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any());
return indexCheckpointManagerProvider;
}
+
+ private static String getIndexPath() {
+ return "storage/partition_0/dataverse/dataset/0/index";
+ }
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 459ff01..68ccd54 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -103,6 +103,6 @@
private void waitForReplicatedDatasetsIO() throws HyracksDataException {
// wait for IO operations to ensure replicated datasets files won't change during replica sync
final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
- appCtx.getDatasetLifecycleManager().waitForIO(replStrategy);
+ appCtx.getDatasetLifecycleManager().waitForIO(replStrategy, replica.getIdentifier().getPartition());
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
index a104ae3..827b713 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
@@ -21,6 +21,7 @@
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.context.BaseOperationTracker;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IJsonSerializable;
@@ -46,7 +47,8 @@
public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) {
IDatasetLifecycleManager dslcManager =
((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
- return new BaseOperationTracker(datasetId, dslcManager.getDatasetInfo(datasetId));
+ int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+ return new BaseOperationTracker(datasetId, dslcManager.getDatasetInfo(datasetId), partition);
}
@Override