[NO ISSUE][STO] Add API to get datasets pending IO

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Add a new API that gets the number of pending io (flush/merge)
  ops for all datasets on an NC.

Change-Id: I062de60e36677f138c60855ff565a0610d80c998
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7644
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 10eddb1..f7cbf18 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -25,6 +25,7 @@
 import org.apache.asterix.common.context.IndexInfo;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.storage.StorageIOStats;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -143,4 +144,9 @@
      * @throws HyracksDataException
      */
     void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException;
+
+    /**
+     * @return the current datasets io stats
+     */
+    StorageIOStats getDatasetsIOStats();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 3ff13cb..f8a81e4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.common.context;
 
+import static org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType.REPLICATE;
+
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -38,7 +40,7 @@
     public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.REPLICATE) {
-            dsInfo.declareActiveIOOperation();
+            dsInfo.declareActiveIOOperation(REPLICATE);
         }
     }
 
@@ -54,7 +56,7 @@
     public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.REPLICATE) {
-            dsInfo.undeclareActiveIOOperation();
+            dsInfo.undeclareActiveIOOperation(REPLICATE);
         }
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 3fcc528..72dacc4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -28,6 +28,7 @@
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -42,6 +43,8 @@
     private final ILogManager logManager;
     private final LogRecord waitLog = new LogRecord();
     private int numActiveIOOps;
+    private int pendingFlushes;
+    private int pendingMerges;
     private long lastAccess;
     private boolean isExternal;
     private boolean isRegistered;
@@ -70,12 +73,32 @@
         setLastAccess(System.currentTimeMillis());
     }
 
-    public synchronized void declareActiveIOOperation() {
+    public synchronized void declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
         numActiveIOOps++;
+        switch (opType) {
+            case FLUSH:
+                pendingFlushes++;
+                break;
+            case MERGE:
+                pendingMerges++;
+                break;
+            default:
+                break;
+        }
     }
 
-    public synchronized void undeclareActiveIOOperation() {
+    public synchronized void undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
         numActiveIOOps--;
+        switch (opType) {
+            case FLUSH:
+                pendingFlushes--;
+                break;
+            case MERGE:
+                pendingMerges--;
+                break;
+            default:
+                break;
+        }
         //notify threads waiting on this dataset info
         notifyAll();
     }
@@ -204,7 +227,7 @@
             while (numActiveIOOps > 0) {
                 try {
                     /**
-                     * Will be Notified by {@link DatasetInfo#undeclareActiveIOOperation()}
+                     * Will be Notified by {@link DatasetInfo#undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType)}
                      */
                     wait();
                 } catch (InterruptedException e) {
@@ -220,4 +243,12 @@
             }
         }
     }
+
+    public synchronized int getPendingFlushes() {
+        return pendingFlushes;
+    }
+
+    public synchronized int getPendingMerges() {
+        return pendingMerges;
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 0750749..5ea79b3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -38,6 +38,7 @@
 import org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.asterix.common.storage.StorageIOStats;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
@@ -519,6 +520,16 @@
         }
     }
 
+    @Override
+    public StorageIOStats getDatasetsIOStats() {
+        StorageIOStats stats = new StorageIOStats();
+        for (DatasetResource dsr : datasets.values()) {
+            stats.addPendingFlushes(dsr.getDatasetInfo().getPendingFlushes());
+            stats.addPendingMerges(dsr.getDatasetInfo().getPendingMerges());
+        }
+        return stats;
+    }
+
     private void closeIndex(IndexInfo indexInfo) throws HyracksDataException {
         if (indexInfo.isOpen()) {
             ILSMOperationTracker opTracker = indexInfo.getIndex().getOperationTracker();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index c3737da..ce6d253 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -259,7 +259,7 @@
 
     @Override
     public synchronized void scheduled(ILSMIOOperation operation) throws HyracksDataException {
-        dsInfo.declareActiveIOOperation();
+        dsInfo.declareActiveIOOperation(operation.getIOOpertionType());
         if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
             pendingFlushes++;
             FlushOperation flush = (FlushOperation) operation;
@@ -282,7 +282,7 @@
                         pendingFlushes == 0 ? firstLsnForCurrentMemoryComponent : (Long) map.get(KEY_FLUSH_LOG_LSN);
             }
         }
-        dsInfo.undeclareActiveIOOperation();
+        dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType());
     }
 
     public synchronized boolean hasPendingFlush() {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/StorageIOStats.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/StorageIOStats.java
new file mode 100644
index 0000000..0b44f76
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/StorageIOStats.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+public class StorageIOStats {
+
+    private int pendingFlushes;
+    private int pendingMerges;
+
+    public void addPendingFlushes(int pending) {
+        pendingFlushes += pending;
+    }
+
+    public void addPendingMerges(int pending) {
+        pendingMerges += pending;
+    }
+
+    public int getPendingFlushes() {
+        return pendingFlushes;
+    }
+
+    public int getPendingMerges() {
+        return pendingMerges;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index 8084c81..753d27a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -37,7 +37,8 @@
         FLUSH,
         MERGE,
         LOAD,
-        NOOP
+        NOOP,
+        REPLICATE
     }
 
     /**