[ASTERIXDB-2118][STO] Ensure flush ordering of memory components

- user model changes: no
- storage format changes: no
- interface changes: no

- Fix the bug of AsynchronousScheduler by waking up the next
flush only when the current operation is a FLUSH operation

Change-Id: I7de4a1625fdd3faaa07f65be2ebc714ec7564b29
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2038
Reviewed-by: Ian Maxon <imaxon@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
index 5f6766f..438bb0b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
@@ -60,19 +60,21 @@
                 super.afterExecute(r, t);
                 LSMIOOperationTask<Boolean> task = (LSMIOOperationTask<Boolean>) r;
                 ILSMIOOperation executedOp = task.getOperation();
-                String id = executedOp.getIndexIdentifier();
-                synchronized (this) {
-                    runningFlushOperations.remove(id);
-                    if (waitingFlushOperations.containsKey(id)) {
-                        try {
-                            ILSMIOOperation op = waitingFlushOperations.get(id).poll();
-                            if (op != null) {
-                                scheduleOperation(op);
-                            } else {
-                                waitingFlushOperations.remove(id);
+                if (executedOp.getIOOpertionType() == LSMIOOpertionType.FLUSH) {
+                    String id = executedOp.getIndexIdentifier();
+                    synchronized (this) {
+                        runningFlushOperations.remove(id);
+                        if (waitingFlushOperations.containsKey(id)) {
+                            try {
+                                ILSMIOOperation op = waitingFlushOperations.get(id).poll();
+                                if (op != null) {
+                                    scheduleOperation(op);
+                                } else {
+                                    waitingFlushOperations.remove(id);
+                                }
+                            } catch (HyracksDataException e) {
+                                t = e.getCause();
                             }
-                        } catch (HyracksDataException e) {
-                            t = e.getCause();
                         }
                     }
                 }
@@ -84,7 +86,7 @@
     public void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException {
         if (operation.getIOOpertionType() == LSMIOOpertionType.MERGE) {
             executor.submit(operation);
-        } else {
+        } else if (operation.getIOOpertionType() == LSMIOOpertionType.FLUSH) {
             String id = operation.getIndexIdentifier();
             synchronized (executor) {
                 if (runningFlushOperations.containsKey(id)) {
@@ -100,6 +102,10 @@
                     executor.submit(operation);
                 }
             }
+        } else {
+            // this should never happen
+            // just guard here to avoid silient failures in case of future extensions
+            throw new IllegalArgumentException("Unknown operation type " + operation.getIOOpertionType());
         }
     }
 }