[ASTERIXDB-3220][REPL] Allow waiting for IO on specific dataset partition

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Add API to wait for IO on a specific dataset partition.
- When waiting for a partition replica IO ops to finish, only wait
  for the replica partition rather than all partitions.

Change-Id: I90f311f602b3c8526556f64d7b25672981fac320
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17633
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 1c2a047..c7eee21 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -161,12 +161,14 @@
     void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions) throws HyracksDataException;
 
     /**
-     * Waits for all ongoing IO operations on all open datasets that are matching {@code replicationStrategy}.
+     * Waits for all ongoing IO operations on all open datasets that are matching {@code replicationStrategy} and
+     * {@code partition}.
      *
      * @param replicationStrategy
+     * @param partition
      * @throws HyracksDataException
      */
-    void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException;
+    void waitForIO(IReplicationStrategy replicationStrategy, int partition) throws HyracksDataException;
 
     /**
      * @return the current datasets io stats
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 5964bb4..7d3dba4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -33,17 +33,19 @@
     private static final Logger LOGGER = LogManager.getLogger();
     protected final int datasetID;
     protected final DatasetInfo dsInfo;
+    protected final int partition;
 
-    public BaseOperationTracker(int datasetID, DatasetInfo dsInfo) {
+    public BaseOperationTracker(int datasetID, DatasetInfo dsInfo, int partition) {
         this.datasetID = datasetID;
         this.dsInfo = dsInfo;
+        this.partition = partition;
     }
 
     @Override
     public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.REPLICATE) {
-            dsInfo.declareActiveIOOperation(REPLICATE);
+            dsInfo.declareActiveIOOperation(REPLICATE, partition);
         }
     }
 
@@ -59,7 +61,7 @@
     public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.REPLICATE) {
-            dsInfo.undeclareActiveIOOperation(REPLICATE);
+            dsInfo.undeclareActiveIOOperation(REPLICATE, partition);
         }
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index d15d9be..87a3c2f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -33,12 +33,16 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
 public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
     private static final Logger LOGGER = LogManager.getLogger();
     // partition -> index
     private final Map<Integer, Set<IndexInfo>> partitionIndexes;
     // resourceID -> index
     private final Map<Long, IndexInfo> indexes;
+    private final Int2IntMap partitionPendingIO;
     private final int datasetID;
     private final ILogManager logManager;
     private final LogRecord waitLog = new LogRecord();
@@ -54,6 +58,7 @@
     public DatasetInfo(int datasetID, ILogManager logManager) {
         this.partitionIndexes = new HashMap<>();
         this.indexes = new HashMap<>();
+        this.partitionPendingIO = new Int2IntOpenHashMap();
         this.setLastAccess(-1);
         this.datasetID = datasetID;
         this.setRegistered(false);
@@ -74,7 +79,8 @@
         setLastAccess(System.currentTimeMillis());
     }
 
-    public synchronized void declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
+    public synchronized void declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int partition) {
+        partitionPendingIO.put(partition, partitionPendingIO.getOrDefault(partition, 0) + 1);
         numActiveIOOps++;
         switch (opType) {
             case FLUSH:
@@ -91,7 +97,8 @@
         }
     }
 
-    public synchronized void undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
+    public synchronized void undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int partition) {
+        partitionPendingIO.put(partition, partitionPendingIO.getOrDefault(partition, 0) - 1);
         numActiveIOOps--;
         switch (opType) {
             case FLUSH:
@@ -253,6 +260,26 @@
         }
     }
 
+    public void waitForIO(int partition) throws HyracksDataException {
+        logManager.log(waitLog);
+        synchronized (this) {
+            while (partitionPendingIO.getOrDefault(partition, 0) > 0) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw HyracksDataException.create(e);
+                }
+            }
+            if (partitionPendingIO.getOrDefault(partition, 0) < 0) {
+                LOGGER.error("number of IO operations cannot be negative for dataset {}, partition {}", this,
+                        partition);
+                throw new IllegalStateException(
+                        "Number of IO operations cannot be negative: " + this + ", partition " + partition);
+            }
+        }
+    }
+
     public synchronized int getPendingFlushes() {
         return pendingFlushes;
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 117b4fc..4fc9dd6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -552,10 +552,10 @@
     }
 
     @Override
-    public void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException {
+    public void waitForIO(IReplicationStrategy replicationStrategy, int partition) throws HyracksDataException {
         for (DatasetResource dsr : datasets.values()) {
             if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
-                dsr.getDatasetInfo().waitForIO();
+                dsr.getDatasetInfo().waitForIO(partition);
             }
         }
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index b0d8e02..a4ad7cf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -59,7 +59,6 @@
 @NotThreadSafe
 public class PrimaryIndexOperationTracker extends BaseOperationTracker implements IoOperationCompleteListener {
     private static final Logger LOGGER = LogManager.getLogger();
-    private final int partition;
     // Number of active operations on an ILSMIndex instance.
     private final AtomicInteger numActiveOperations;
     private final ILogManager logManager;
@@ -71,8 +70,7 @@
 
     public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
             ILSMComponentIdGenerator idGenerator) {
-        super(datasetID, dsInfo);
-        this.partition = partition;
+        super(datasetID, dsInfo, partition);
         this.logManager = logManager;
         this.numActiveOperations = new AtomicInteger();
         this.idGenerator = idGenerator;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 1189b51..f56e5c0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -74,6 +74,7 @@
     private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
     protected final DatasetInfo dsInfo;
     protected final ILSMIndex lsmIndex;
+    private final int partition;
     private long firstLsnForCurrentMemoryComponent = 0L;
     private long persistenceLsn = 0L;
     private int pendingFlushes = 0;
@@ -84,6 +85,7 @@
         this.dsInfo = dsInfo;
         this.lsmIndex = lsmIndex;
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
+        this.partition = ResourceReference.ofIndex(lsmIndex.getIndexIdentifier()).getPartitionNum();
         componentIds.add(componentId);
     }
 
@@ -259,7 +261,7 @@
 
     @Override
     public synchronized void scheduled(ILSMIOOperation operation) throws HyracksDataException {
-        dsInfo.declareActiveIOOperation(operation.getIOOpertionType());
+        dsInfo.declareActiveIOOperation(operation.getIOOpertionType(), partition);
         if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
             pendingFlushes++;
             FlushOperation flush = (FlushOperation) operation;
@@ -282,7 +284,7 @@
                         pendingFlushes == 0 ? firstLsnForCurrentMemoryComponent : (Long) map.get(KEY_FLUSH_LOG_LSN);
             }
         }
-        dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType());
+        dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType(), partition);
     }
 
     public synchronized boolean hasPendingFlush() {
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
index db0911b..33d513f 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
@@ -77,6 +77,7 @@
         String indexId = "mockIndexId";
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
         Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
+        Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath());
         DatasetInfo dsInfo = new DatasetInfo(101, null);
         LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
         LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
@@ -140,6 +141,7 @@
         ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
+        Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath());
         ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
         Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
         LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
@@ -161,6 +163,7 @@
         ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
+        Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath());
         ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
         Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
         LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
@@ -221,4 +224,8 @@
         Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any());
         return indexCheckpointManagerProvider;
     }
+
+    private static String getIndexPath() {
+        return "storage/partition_0/dataverse/dataset/0/index";
+    }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 459ff01..68ccd54 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -103,6 +103,6 @@
     private void waitForReplicatedDatasetsIO() throws HyracksDataException {
         // wait for IO operations to ensure replicated datasets files won't change during replica sync
         final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
-        appCtx.getDatasetLifecycleManager().waitForIO(replStrategy);
+        appCtx.getDatasetLifecycleManager().waitForIO(replStrategy, replica.getIdentifier().getPartition());
     }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
index a104ae3..827b713 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
@@ -21,6 +21,7 @@
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.context.BaseOperationTracker;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IJsonSerializable;
@@ -46,7 +47,8 @@
     public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) {
         IDatasetLifecycleManager dslcManager =
                 ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
-        return new BaseOperationTracker(datasetId, dslcManager.getDatasetInfo(datasetId));
+        int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+        return new BaseOperationTracker(datasetId, dslcManager.getDatasetInfo(datasetId), partition);
     }
 
     @Override