[NO ISSUE][RT] Fix wait for IO operations

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Some operations such as close dataset, delete components,
  and drop index need to wait for IO operations.
- Before this change, the wait for IO operation would just check
  the count of IO operations on the dataset info. This is not
  enough as a flush might have started by writing the flush log to
  the log tail but only on the flush of that log, we trigger the
  flush operation and the count of IO operation increases.
- To address this problem, we write a wait log before we check the
  IO operation count ensuring that any flush logs in the log tail
  have been flushed and counts incremented.

Change-Id: Ibfa883410cd24e0af54732f7ea6f1b4eb2184e8e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2495
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 44baf77..f4d764a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -23,6 +23,9 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.logging.log4j.LogManager;
@@ -35,6 +38,8 @@
     // resourceID -> index
     private final Map<Long, IndexInfo> indexes;
     private final int datasetID;
+    private final ILogManager logManager;
+    private final LogRecord waitLog = new LogRecord();
     private int numActiveIOOps;
     private long lastAccess;
     private boolean isExternal;
@@ -42,13 +47,16 @@
     private boolean memoryAllocated;
     private boolean durable;
 
-    public DatasetInfo(int datasetID) {
+    public DatasetInfo(int datasetID, ILogManager logManager) {
         this.partitionIndexes = new HashMap<>();
         this.indexes = new HashMap<>();
         this.setLastAccess(-1);
         this.datasetID = datasetID;
         this.setRegistered(false);
         this.setMemoryAllocated(false);
+        this.logManager = logManager;
+        waitLog.setLogType(LogType.WAIT);
+        waitLog.computeAndSetLogSize();
     }
 
     @Override
@@ -199,23 +207,26 @@
         this.lastAccess = lastAccess;
     }
 
-    public synchronized void waitForIO() throws HyracksDataException {
-        while (numActiveIOOps > 0) {
-            try {
-                /**
-                 * Will be Notified by {@link DatasetInfo#undeclareActiveIOOperation()}
-                 */
-                wait();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw HyracksDataException.create(e);
+    public void waitForIO() throws HyracksDataException {
+        logManager.log(waitLog);
+        synchronized (this) {
+            while (numActiveIOOps > 0) {
+                try {
+                    /**
+                     * Will be Notified by {@link DatasetInfo#undeclareActiveIOOperation()}
+                     */
+                    wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw HyracksDataException.create(e);
+                }
             }
-        }
-        if (numActiveIOOps < 0) {
-            if (LOGGER.isErrorEnabled()) {
-                LOGGER.error("Number of IO operations cannot be negative for dataset: " + this);
+            if (numActiveIOOps < 0) {
+                if (LOGGER.isErrorEnabled()) {
+                    LOGGER.error("Number of IO operations cannot be negative for dataset: " + this);
+                }
+                throw new IllegalStateException("Number of IO operations cannot be negative");
             }
-            throw new IllegalStateException("Number of IO operations cannot be negative");
         }
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index f25e4f6..b715eec 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -239,7 +239,7 @@
         synchronized (datasets) {
             dsr = datasets.get(did);
             if (dsr == null) {
-                DatasetInfo dsInfo = new DatasetInfo(did);
+                DatasetInfo dsInfo = new DatasetInfo(did, logManager);
                 int partitions = MetadataIndexImmutableProperties.isMetadataDataset(did) ? METADATA_DATASETS_PARTITIONS
                         : numPartitions;
                 DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties,
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
index f9f742a..befbeed 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -189,7 +189,7 @@
         properties.put("max-tolerance-component-count", String.valueOf(MAX_COMPONENT_COUNT));
         properties.put("max-mergable-component-size", String.valueOf(MAX_COMPONENT_SIZE));
 
-        DatasetInfo dsInfo = new DatasetInfo(DATASET_ID);
+        DatasetInfo dsInfo = new DatasetInfo(DATASET_ID, null);
         for (IndexInfo index : indexInfos) {
             dsInfo.addIndex(index.getResourceId(), index);
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 6208cef..3ada608 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -161,7 +161,8 @@
     }
 
     synchronized void syncAppendToLogTail(ILogRecord logRecord) {
-        if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH) {
+        if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
+                && logRecord.getLogType() != LogType.WAIT) {
             ITransactionContext txnCtx = logRecord.getTxnCtx();
             if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
                 throw new ACIDException(