[NO ISSUE][STO] Refactor Waiting For Dataset IO Ops
Change-Id: I6e37fe5062f471d30219c9d0a374c92a9917d513
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2099
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 71d4a96..f703c19 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -22,14 +22,18 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
+ private static final Logger LOGGER = Logger.getLogger(DatasetInfo.class.getName());
private final Map<Long, IndexInfo> indexes;
private final int datasetID;
- private long lastAccess;
private int numActiveIOOps;
+ private long lastAccess;
private boolean isExternal;
private boolean isRegistered;
private boolean memoryAllocated;
@@ -56,11 +60,11 @@
}
public synchronized void declareActiveIOOperation() {
- setNumActiveIOOps(getNumActiveIOOps() + 1);
+ numActiveIOOps++;
}
public synchronized void undeclareActiveIOOperation() {
- setNumActiveIOOps(getNumActiveIOOps() - 1);
+ numActiveIOOps--;
//notify threads waiting on this dataset info
notifyAll();
}
@@ -126,7 +130,7 @@
return datasetID == ((DatasetInfo) obj).datasetID;
}
return false;
- };
+ }
@Override
public int hashCode() {
@@ -144,14 +148,6 @@
return durable;
}
- public int getNumActiveIOOps() {
- return numActiveIOOps;
- }
-
- public void setNumActiveIOOps(int numActiveIOOps) {
- this.numActiveIOOps = numActiveIOOps;
- }
-
public boolean isExternal() {
return isExternal;
}
@@ -195,4 +191,24 @@
public void setLastAccess(long lastAccess) {
this.lastAccess = lastAccess;
}
+
+ public synchronized void waitForIO() throws HyracksDataException {
+ while (numActiveIOOps > 0) {
+ try {
+ /**
+ * Will be Notified by {@link DatasetInfo#undeclareActiveIOOperation()}
+ */
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ }
+ }
+ if (numActiveIOOps < 0) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Number of IO operations cannot be negative for dataset: " + this);
+ }
+ throw new IllegalStateException("Number of IO operations cannot be negative");
+ }
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 37bd789..559d57c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -102,7 +102,7 @@
datasetResource.register(resource, index);
}
- public int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
+ private int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
LocalResource lr = resourceRepository.get(resourcePath);
if (lr == null) {
return -1;
@@ -110,7 +110,7 @@
return ((DatasetLocalResource) lr.getResource()).getDatasetId();
}
- public long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException {
+ private long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException {
LocalResource lr = resourceRepository.get(resourcePath);
if (lr == null) {
return -1;
@@ -133,36 +133,24 @@
PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
- throw new HyracksDataException("Cannot remove index while it is open. (Dataset reference count = "
- + iInfo.getReferenceCount() + ", Operation tracker number of active operations = "
- + opTracker.getNumActiveOperations() + ")");
+ throw new HyracksDataException(
+ "Cannot remove index while it is open. (Dataset reference count = " + iInfo.getReferenceCount()
+ + ", Operation tracker number of active operations = " + opTracker.getNumActiveOperations()
+ + ")");
}
// TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
- // First wait for any ongoing IO operations
DatasetInfo dsInfo = dsr.getDatasetInfo();
- synchronized (dsInfo) {
- while (dsInfo.getNumActiveIOOps() > 0) {
- try {
- //notification will come from DatasetInfo class (undeclareActiveIOOperation)
- dsInfo.wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw HyracksDataException.create(e);
- }
- }
- }
-
+ dsInfo.waitForIO();
if (iInfo.isOpen()) {
ILSMOperationTracker indexOpTracker = iInfo.getIndex().getOperationTracker();
synchronized (indexOpTracker) {
iInfo.getIndex().deactivate(false);
}
}
-
dsInfo.getIndexes().remove(resourceID);
- if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
- && !dsInfo.isExternal()) {
+ if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty() && !dsInfo
+ .isExternal()) {
removeDatasetFromCache(dsInfo.getDatasetID());
}
}
@@ -227,16 +215,7 @@
}
// Wait for the above flush op.
- synchronized (dsInfo) {
- while (dsInfo.getNumActiveIOOps() > 0) {
- try {
- //notification will come from DatasetInfo class (undeclareActiveIOOperation)
- dsInfo.wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- }
+ dsInfo.waitForIO();
}
public DatasetResource getDatasetLifecycle(int did) {
@@ -434,16 +413,7 @@
private void closeDataset(DatasetInfo dsInfo) throws HyracksDataException {
// First wait for any ongoing IO operations
- synchronized (dsInfo) {
- while (dsInfo.getNumActiveIOOps() > 0) {
- try {
- dsInfo.wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw HyracksDataException.create(e);
- }
- }
- }
+ dsInfo.waitForIO();
try {
flushDatasetOpenIndexes(dsInfo, false);
} catch (Exception e) {