Wait for io properly. Added additional commentes.
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
index 9cbdb81..617b6ff 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -122,21 +122,9 @@
}
}
- if (iInfo.isOpen) {
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- accessor.scheduleFlush(((BaseOperationTracker) iInfo.index.getOperationTracker()).getIOOperationCallback());
- }
-
- // Then wait for the above flush op.
- // They are separated so they don't deadlock each other.
- while (dsInfo.numActiveIOOps > 0) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
+ // Flush and wait for it to finish, it is separated from the above wait so they don't deadlock each other.
+ // TODO: Find a better way to do this.
+ flushAndWaitForIO(dsInfo, iInfo);
if (iInfo.isOpen) {
iInfo.index.deactivate(false);
@@ -235,21 +223,12 @@
}
for (IndexInfo iInfo : dsInfo.indexes.values()) {
- if (iInfo.isOpen) {
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) iInfo.index.createAccessor(
- NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- accessor.scheduleFlush(((BaseOperationTracker) iInfo.index.getOperationTracker())
- .getIOOperationCallback());
- }
+ // TODO: This is not efficient since we flush the indexes sequentially.
+ // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this
+ // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
+ flushAndWaitForIO(dsInfo, iInfo);
}
- // Wait for the above flush ops.
- while (dsInfo.numActiveIOOps > 0) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
+
for (IndexInfo iInfo : dsInfo.indexes.values()) {
if (iInfo.isOpen) {
iInfo.index.deactivate(false);
@@ -267,10 +246,25 @@
}
}
-
return false;
}
+ private void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException {
+ if (iInfo.isOpen) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFlush(((BaseOperationTracker) iInfo.index.getOperationTracker()).getIOOperationCallback());
+ }
+ // Wait for the above flush op.
+ while (dsInfo.numActiveIOOps > 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
@Override
public synchronized void close(long resourceID) throws HyracksDataException {
int did = getDIDfromRID(resourceID);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index f2c1dd8..2ed4b0ec 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -73,8 +73,9 @@
}
private void flushIfFull() throws HyracksDataException {
- Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetIndexes(datasetID);
// If we need a flush, and this is the last completing operation, then schedule the flush.
+ // TODO: Is there a better way to check if we need to flush instead of communicating with the datasetLifecycleManager each time?
+ Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetIndexes(datasetID);
boolean needsFlush = false;
for (ILSMIndex lsmIndex : indexes) {
if (((ILSMIndexInternal) lsmIndex).hasFlushRequestForCurrentMutableComponent()) {