[NO ISSUE][STO] Add API to get datasets pending IO
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Add a new API that gets the number of pending io (flush/merge)
ops for all datasets on an NC.
Change-Id: I062de60e36677f138c60855ff565a0610d80c998
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7644
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 10eddb1..f7cbf18 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -25,6 +25,7 @@
import org.apache.asterix.common.context.IndexInfo;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.storage.StorageIOStats;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -143,4 +144,9 @@
* @throws HyracksDataException
*/
void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException;
+
+ /**
+ * @return the current datasets io stats
+ */
+ StorageIOStats getDatasetsIOStats();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 3ff13cb..f8a81e4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.common.context;
+import static org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType.REPLICATE;
+
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -38,7 +40,7 @@
public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.REPLICATE) {
- dsInfo.declareActiveIOOperation();
+ dsInfo.declareActiveIOOperation(REPLICATE);
}
}
@@ -54,7 +56,7 @@
public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.REPLICATE) {
- dsInfo.undeclareActiveIOOperation();
+ dsInfo.undeclareActiveIOOperation(REPLICATE);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 3fcc528..72dacc4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -28,6 +28,7 @@
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -42,6 +43,8 @@
private final ILogManager logManager;
private final LogRecord waitLog = new LogRecord();
private int numActiveIOOps;
+ private int pendingFlushes;
+ private int pendingMerges;
private long lastAccess;
private boolean isExternal;
private boolean isRegistered;
@@ -70,12 +73,32 @@
setLastAccess(System.currentTimeMillis());
}
- public synchronized void declareActiveIOOperation() {
+ public synchronized void declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
numActiveIOOps++;
+ switch (opType) {
+ case FLUSH:
+ pendingFlushes++;
+ break;
+ case MERGE:
+ pendingMerges++;
+ break;
+ default:
+ break;
+ }
}
- public synchronized void undeclareActiveIOOperation() {
+ public synchronized void undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
numActiveIOOps--;
+ switch (opType) {
+ case FLUSH:
+ pendingFlushes--;
+ break;
+ case MERGE:
+ pendingMerges--;
+ break;
+ default:
+ break;
+ }
//notify threads waiting on this dataset info
notifyAll();
}
@@ -204,7 +227,7 @@
while (numActiveIOOps > 0) {
try {
/**
- * Will be Notified by {@link DatasetInfo#undeclareActiveIOOperation()}
+ * Will be Notified by {@link DatasetInfo#undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType)}
*/
wait();
} catch (InterruptedException e) {
@@ -220,4 +243,12 @@
}
}
}
+
+ public synchronized int getPendingFlushes() {
+ return pendingFlushes;
+ }
+
+ public synchronized int getPendingMerges() {
+ return pendingMerges;
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 0750749..5ea79b3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -38,6 +38,7 @@
import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.asterix.common.storage.StorageIOStats;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogType;
@@ -519,6 +520,16 @@
}
}
+ @Override
+ public StorageIOStats getDatasetsIOStats() {
+ StorageIOStats stats = new StorageIOStats();
+ for (DatasetResource dsr : datasets.values()) {
+ stats.addPendingFlushes(dsr.getDatasetInfo().getPendingFlushes());
+ stats.addPendingMerges(dsr.getDatasetInfo().getPendingMerges());
+ }
+ return stats;
+ }
+
private void closeIndex(IndexInfo indexInfo) throws HyracksDataException {
if (indexInfo.isOpen()) {
ILSMOperationTracker opTracker = indexInfo.getIndex().getOperationTracker();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index c3737da..ce6d253 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -259,7 +259,7 @@
@Override
public synchronized void scheduled(ILSMIOOperation operation) throws HyracksDataException {
- dsInfo.declareActiveIOOperation();
+ dsInfo.declareActiveIOOperation(operation.getIOOpertionType());
if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
pendingFlushes++;
FlushOperation flush = (FlushOperation) operation;
@@ -282,7 +282,7 @@
pendingFlushes == 0 ? firstLsnForCurrentMemoryComponent : (Long) map.get(KEY_FLUSH_LOG_LSN);
}
}
- dsInfo.undeclareActiveIOOperation();
+ dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType());
}
public synchronized boolean hasPendingFlush() {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/StorageIOStats.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/StorageIOStats.java
new file mode 100644
index 0000000..0b44f76
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/StorageIOStats.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+public class StorageIOStats {
+
+ private int pendingFlushes;
+ private int pendingMerges;
+
+ public void addPendingFlushes(int pending) {
+ pendingFlushes += pending;
+ }
+
+ public void addPendingMerges(int pending) {
+ pendingMerges += pending;
+ }
+
+ public int getPendingFlushes() {
+ return pendingFlushes;
+ }
+
+ public int getPendingMerges() {
+ return pendingMerges;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index 8084c81..753d27a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -37,7 +37,8 @@
FLUSH,
MERGE,
LOAD,
- NOOP
+ NOOP,
+ REPLICATE
}
/**