ASTERIXDB-1011: added flow control for merge policy
See the design document here:
https://cwiki.apache.org/confluence/display/ASTERIXDB/Flush-Operation+Flow+Control+For+Merge+Policy

Change-Id: Ide99c022861f96cd60bc8f5795c4964ab02b3e14
Reviewed-on: https://asterix-gerrit.ics.uci.edu/795
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index a7374d3..3d112ef 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -46,13 +46,13 @@
     private final int datasetID;
 
     public CorrelatedPrefixMergePolicy(IIndexLifecycleManager datasetLifecycleManager, int datasetID) {
-        this.datasetLifecycleManager = (DatasetLifecycleManager)datasetLifecycleManager;
+        this.datasetLifecycleManager = (DatasetLifecycleManager) datasetLifecycleManager;
         this.datasetID = datasetID;
     }
 
     @Override
-    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
-            IndexException {
+    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested)
+            throws HyracksDataException, IndexException {
         // This merge policy will only look at primary indexes in order to evaluate if a merge operation is needed. If it decides that
         // a merge operation is needed, then it will merge *all* the indexes that belong to the dataset. The criteria to decide if a merge
         // is needed is the same as the one that is used in the prefix merge policy:
@@ -113,8 +113,8 @@
                     // Reverse the components order back to its original order
                     Collections.reverse(mergableComponents);
 
-                    ILSMIndexAccessor accessor = lsmIndex.createAccessor(
-                            NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+                    ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
+                            NoOpOperationCallback.INSTANCE);
                     accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents);
                 }
                 break;
@@ -127,4 +127,10 @@
         maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size"));
         maxToleranceComponentCount = Integer.parseInt(properties.get("max-tolerance-component-count"));
     }
+
+    @Override
+    public boolean isMergeLagging(ILSMIndex index) {
+        //TODO implement properly according to the merge policy
+        return false;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
index 841117b..c64fe63 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
@@ -25,8 +25,35 @@
 import org.apache.hyracks.storage.am.common.api.IndexException;
 
 public interface ILSMMergePolicy {
-    public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
-            IndexException;
+    public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested)
+            throws HyracksDataException, IndexException;
 
     public void configure(Map<String, String> properties);
+
+    /**
+     * This method is used for flush-operation flow control:
+     * When the space occupancy of the in-memory component exceeds a specified memory threshold,
+     * entries are flushed to disk. As entries accumulate on disk, the entries are periodically
+     * merged together subject to a merge policy that decides when and what to merge. A merge
+     * policy may impose a certain constraint such as a maximum number of mergable(merge-able)
+     * disk components in order to provide a reasonable query response time. Otherwise, the query
+     * response time gets slower as the number of disk components increases.
+     * In order to avoid such an unexpected situation according to the merge policy, a way to
+     * control the number of disk components is provided by introducing a new method,
+     * isMegeLagging() in ILSMMergePolicy interface. When flushing an in-memory component is completed,
+     * the provided isMergeLagging() method is called to decide whether the memory budget for the
+     * current flushed in-memory component should be available for the incoming updated(inserted/deleted/updated)
+     * entries or not. If the method returns true, i.e., the merge operation is lagged according to
+     * the merge policy, the memory budget will not be made available for the incoming entries by
+     * making the current flush operation thread wait until (ongoing) merge operation finishes.
+     * Therefore, this will effectively prevent the number of disk components from exceeding
+     * a threshold of the allowed number of disk components.
+     *
+     * @param index
+     * @return true if merge operation is lagged according to the implemented merge policy,
+     *         false otherwise.
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
+    public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException, IndexException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index 93d6fd4..aad5bf47 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -35,20 +35,20 @@
     private int numComponents;
 
     @Override
-    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
-            IndexException {
+    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested)
+            throws HyracksDataException, IndexException {
         List<ILSMComponent> immutableComponents = index.getImmutableComponents();
-        for (ILSMComponent c : immutableComponents) {
-            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
-                return;
-            }
+
+        if (!areComponentsMergable(immutableComponents)) {
+            return;
         }
+
         if (fullMergeIsRequested) {
-            ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+            ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
             accessor.scheduleFullMerge(index.getIOOperationCallback());
         } else if (immutableComponents.size() >= numComponents) {
-            ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+            ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
             accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents);
         }
@@ -58,4 +58,89 @@
     public void configure(Map<String, String> properties) {
         numComponents = Integer.parseInt(properties.get("num-components"));
     }
+
+    @Override
+    public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException, IndexException {
+        // see PrefixMergePolicy.isMergeLagging() for the rationale behind this code.
+
+        /**
+         * case 1.
+         * if totalImmutableCommponentCount < threshold,
+         * merge operation is not lagged ==> return false.
+         * case 2.
+         * if a) totalImmutableCommponentCount >= threshold && b) there is an ongoing merge,
+         * merge operation is lagged. ==> return true.
+         * case 3. *SPECIAL CASE*
+         * if a) totalImmutableCommponentCount >= threshold && b) there is *NO* ongoing merge,
+         * merge operation is lagged. ==> *schedule a merge operation* and then return true.
+         * This is a special case that requires to schedule a merge operation.
+         * Otherwise, all flush operations will be hung.
+         * This case can happen in a following situation:
+         * The system may crash when
+         * condition 1) the mergableImmutableCommponentCount >= threshold and
+         * condition 2) merge operation is going on.
+         * After the system is recovered, still condition 1) is true.
+         * If there are flush operations in the same dataset partition after the recovery,
+         * all these flush operations may not proceed since there is no ongoing merge and
+         * there will be no new merge either in this situation.
+         */
+
+        List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+        int totalImmutableComponentCount = immutableComponents.size();
+
+        // [case 1]
+        if (totalImmutableComponentCount < numComponents) {
+            return false;
+        }
+
+        boolean isMergeOngoing = isMergeOngoing(immutableComponents);
+
+        // here, implicitly (totalImmutableComponentCount >= numComponents) is true by passing case 1.
+        if (isMergeOngoing) {
+            // [case 2]
+            return true;
+        } else {
+            // [case 3]
+            // schedule a merge operation after making sure that all components are mergable
+            if (!areComponentsMergable(immutableComponents)) {
+                throw new IllegalStateException();
+            }
+            ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
+                    NoOpOperationCallback.INSTANCE);
+            accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents);
+            return true;
+        }
+    }
+
+    /**
+     * checks whether all given components are mergable or not
+     *
+     * @param immutableComponents
+     * @return true if all components are mergable, false otherwise.
+     */
+    private boolean areComponentsMergable(List<ILSMComponent> immutableComponents) {
+        for (ILSMComponent c : immutableComponents) {
+            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * This method returns whether there is an ongoing merge operation or not by checking
+     * each component state of given components.
+     *
+     * @param immutableComponents
+     * @return true if there is an ongoing merge operation, false otherwise.
+     */
+    private boolean isMergeOngoing(List<ILSMComponent> immutableComponents) {
+        int size = immutableComponents.size();
+        for (int i = 0; i < size; i++) {
+            if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index a19532f..b58cc29 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -200,6 +200,25 @@
         try {
             synchronized (opTracker) {
                 try {
+
+                    /**
+                     * [flow control]
+                     * If merge operations are lagged according to the merge policy,
+                     * flushing in-memory components are hold until the merge operation catches up.
+                     * See PrefixMergePolicy.isMergeLagging() for more details.
+                     */
+                    if (opType == LSMOperationType.FLUSH) {
+                        while (mergePolicy.isMergeLagging(lsmIndex)) {
+                            try {
+                                opTracker.wait();
+                            } catch (InterruptedException e) {
+                                //ignore
+                            }
+                        }
+                    } else if (opType == LSMOperationType.MERGE) {
+                        opTracker.notifyAll();
+                    }
+
                     int i = 0;
                     // First check if there is any action that is needed to be taken based on the state of each component.
                     for (ILSMComponent c : ctx.getComponentHolder()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
index 39ab815..86be9c8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
@@ -28,8 +28,8 @@
 public class NoMergePolicy implements ILSMMergePolicy {
 
     @Override
-    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
-            IndexException {
+    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested)
+            throws HyracksDataException, IndexException {
         // Do nothing
     }
 
@@ -37,4 +37,9 @@
     public void configure(Map<String, String> properties) {
         // Do nothing
     }
+
+    @Override
+    public boolean isMergeLagging(ILSMIndex index) {
+        return false;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
index 5b8da53..36cb958 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
@@ -39,8 +39,188 @@
     private int maxToleranceComponentCount;
 
     @Override
-    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
-            IndexException {
+    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested)
+            throws HyracksDataException, IndexException {
+
+        List<ILSMComponent> immutableComponents = new ArrayList<ILSMComponent>(index.getImmutableComponents());
+
+        if (!areComponentsReadableWritableState(immutableComponents)) {
+            return;
+        }
+
+        if (fullMergeIsRequested) {
+            ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
+                    NoOpOperationCallback.INSTANCE);
+            accessor.scheduleFullMerge(index.getIOOperationCallback());
+            return;
+        }
+
+        scheduleMerge(index);
+    }
+
+    @Override
+    public void configure(Map<String, String> properties) {
+        maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size"));
+        maxToleranceComponentCount = Integer.parseInt(properties.get("max-tolerance-component-count"));
+    }
+
+    @Override
+    public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException, IndexException {
+
+        /**
+         * [for flow-control purpose]
+         * when merge operations are lagged, threads which flushed components will be blocked
+         * until merge operations catch up, i.e, until the number of mergable immutable components <= maxToleranceComponentCount
+         * example:
+         * suppose that maxToleranceComponentCount = 3 and maxMergableComponentSize = 1GB
+         * The following shows a set of events occurred in time ti with a brief description.
+         * time
+         * t40: c32-1(1GB, RU) c38-33(192MB, RU) c39-39(32MB, RU) c40-40(32MB, RU)
+         * --> a thread which added c40-40 will trigger a merge including c38-33,c39-39,c40-40
+         * t41: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) c40-40(32MB, RUM) c41-41(32MB, RU)
+         * --> a thread which added c41-41 will not be blocked
+         * t42: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) c40-40(32MB, RUM) c41-41(32MB, RU) c42-42(32MB, RU)
+         * --> a thread which added c42-42 will not be blocked
+         * t43: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) c40-40(32MB, RUM) c41-41(32MB, RU) c42-42(32MB, RU) c43-43(32MB, RU)
+         * --> a thread which added c43-43 will not be blocked and will not trigger a merge since there is an ongoing merge triggered in t1.
+         * t44: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) c40-40(32MB, RUM) c41-41(32MB, RU) c42-42(32MB, RU) c43-43(32MB, RU) 'c44-44(32MB, RU)'
+         * --> a thread which will add c44-44 (even if the disk component is created, but not added to index instance disk components yet)
+         * will be blocked until the number of RU components < maxToleranceComponentCount
+         * t45: c32-1(1GB, RU) *c40-33(256MB, RU)* c41-41(32MB, RU) c42-42(32MB, RU) c43-43(32MB, RU) 'c44-44(32MB, RU)'
+         * --> a thread which completed the merge triggered in t1 added c40-33 and will go ahead and trigger the next merge with c40-33,c41-41,c42-42,c43-43.
+         * Still, the blocked thread will continue being blocked and the c44-44 was not included in the merge since it's not added yet.
+         * t46: c32-1(1GB, RU) c40-33(256MB, RUM) c41-41(32MB, RUM) c42-42(32MB, RUM) c43-43(32MB, RUM) c44-44(32MB, RUM)
+         * --> the merge triggered in t45 is going on and the merge unblocked the blocked thread, so c44-44 was added.
+         * t47: c32-1(1GB, RU) *c43-33(320MB, RU)* c44-44(32MB, RUM)
+         * --> a thread completed the merge triggered in t45 and added c43-33.
+         * t48: c32-1(1GB, RU) c43-33(320MB, RU) c44-44(32MB, RUM) c48-48(32MB, RU)
+         * --> a thread added c48-48 and will not be blocked and will trigger a merge with c43-44, c44-44, c48-48.
+         * ... continues ...
+         * ----------------------------------------
+         * legend:
+         * For example, C32-1 represents a disk component, more specifically, disk component name, where 32-1 represents a timestamp range from t1 to time t32.
+         * This means that the component C32-1 is a component resulting from a merge operation that merged components C1-1 to C32-32.
+         * This also implies that if two timestamps in a component name are equal, the component has not been merged yet after it was created.
+         * RU and RUM are possible state of disk components, where RU represents READABLE_UNWRITABLE and RUM represents READABLE_UNWRITABLE_MERGING.
+         * Now, c32-1(1GB, RU) represents a disk component resulted from merging c1-1 ~ c32-32 and the component size is 1GB.
+         * ----------------------------------------
+         * The flow control allows at most maxToleranceComponentCount mergable components,
+         * where the mergable components are disk components whose i) state == RU and ii) size < maxMergableComponentSize.
+         */
+
+        /**
+         * case 1.
+         * if mergableImmutableCommponentCount < threshold,
+         * merge operation is not lagged ==> return false.
+         * case 2.
+         * if a) mergableImmutableCommponentCount >= threshold && b) there is an ongoing merge,
+         * merge operation is lagged. ==> return true.
+         * case 3. *SPECIAL CASE*
+         * if a) mergableImmutableCommponentCount >= threshold && b) there is *NO* ongoing merge,
+         * merge operation is lagged. ==> *schedule a merge operation* and then return true.
+         * This is a special case that requires to schedule a merge operation.
+         * Otherwise, all flush operations will be hung.
+         * This case can happen in a following situation:
+         * The system may crash when
+         * condition 1) the mergableImmutableCommponentCount >= threshold and
+         * condition 2) merge operation is going on.
+         * After the system is recovered, still condition 1) is true.
+         * If there are flush operations in the same dataset partition after the recovery,
+         * all these flush operations may not proceed since there is no ongoing merge and
+         * there will be no new merge either in this situation.
+         */
+
+        List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+        int mergableImmutableComponentCount = getMergableImmutableComponentCount(immutableComponents);
+
+        // [case 1]
+        if (mergableImmutableComponentCount < maxToleranceComponentCount) {
+            return false;
+        }
+
+        boolean isMergeOngoing = isMergeOngoing(immutableComponents);
+
+        // here, implicitly (mergableImmutableComponentCount >= maxToleranceComponentCount) is true by passing case 1.
+        if (isMergeOngoing) {
+            // [case 2]
+            return true;
+        } else {
+            // [case 3]
+            // make sure that all components are of READABLE_UNWRITABLE state.
+            if (!areComponentsReadableWritableState(immutableComponents)) {
+                throw new IllegalStateException();
+            }
+            // schedule a merge operation
+            boolean isMergeTriggered = scheduleMerge(index);
+            if (!isMergeTriggered) {
+                throw new IllegalStateException();
+            }
+            return true;
+        }
+    }
+
+    /**
+     * This method returns whether there is an ongoing merge operation or not by checking
+     * each component state of given components.
+     *
+     * @param immutableComponents
+     * @return true if there is an ongoing merge operation, false otherwise.
+     */
+    private boolean isMergeOngoing(List<ILSMComponent> immutableComponents) {
+        int size = immutableComponents.size();
+        for (int i = 0; i < size; i++) {
+            if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * This method returns the number of mergable components among the given list
+     * of immutable components that are ordered from the latest component to order ones. A caller
+     * need to make sure the order in the list.
+     *
+     * @param immutableComponents
+     * @return the number of mergable component
+     */
+    private int getMergableImmutableComponentCount(List<ILSMComponent> immutableComponents) {
+        int count = 0;
+        for (ILSMComponent c : immutableComponents) {
+            long componentSize = ((AbstractDiskLSMComponent) c).getComponentSize();
+            //stop when the first non-mergable component is found.
+            if (c.getState() != ComponentState.READABLE_UNWRITABLE || componentSize > maxMergableComponentSize) {
+                break;
+            }
+            ++count;
+        }
+        return count;
+    }
+
+    /**
+     * checks whether all given components are of READABLE_UNWRITABLE state
+     *
+     * @param immutableComponents
+     * @return true if all components are of READABLE_UNWRITABLE state, false otherwise.
+     */
+    private boolean areComponentsReadableWritableState(List<ILSMComponent> immutableComponents) {
+        for (ILSMComponent c : immutableComponents) {
+            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * schedule a merge operation according to this prefix merge policy
+     *
+     * @param index
+     * @return true if merge is scheduled, false otherwise.
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
+    private boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException, IndexException {
         // 1.  Look at the candidate components for merging in oldest-first order.  If one exists, identify the prefix of the sequence of
         // all such components for which the sum of their sizes exceeds MaxMrgCompSz.  Schedule a merge of those components into a new component.
         // 2.  If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt.  If so, schedule
@@ -49,17 +229,6 @@
         // Reverse the components order so that we look at components from oldest to newest.
         Collections.reverse(immutableComponents);
 
-        for (ILSMComponent c : immutableComponents) {
-            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
-                return;
-            }
-        }
-        if (fullMergeIsRequested) {
-            ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
-            accessor.scheduleFullMerge(index.getIOOperationCallback());
-            return;
-        }
         long totalSize = 0;
         int startIndex = -1;
         for (int i = 0; i < immutableComponents.size(); i++) {
@@ -80,17 +249,13 @@
                 }
                 // Reverse the components order back to its original order
                 Collections.reverse(mergableComponents);
-                ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
                         NoOpOperationCallback.INSTANCE);
                 accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents);
-                break;
+                return true;
             }
         }
+        return false;
     }
 
-    @Override
-    public void configure(Map<String, String> properties) {
-        maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size"));
-        maxToleranceComponentCount = Integer.parseInt(properties.get("max-tolerance-component-count"));
-    }
 }