ASTERIXDB-1011: added flow control for merge policy
See the design document here:
https://cwiki.apache.org/confluence/display/ASTERIXDB/Flush-Operation+Flow+Control+For+Merge+Policy
Change-Id: Ide99c022861f96cd60bc8f5795c4964ab02b3e14
Reviewed-on: https://asterix-gerrit.ics.uci.edu/795
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index a7374d3..3d112ef 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -46,13 +46,13 @@
private final int datasetID;
public CorrelatedPrefixMergePolicy(IIndexLifecycleManager datasetLifecycleManager, int datasetID) {
- this.datasetLifecycleManager = (DatasetLifecycleManager)datasetLifecycleManager;
+ this.datasetLifecycleManager = (DatasetLifecycleManager) datasetLifecycleManager;
this.datasetID = datasetID;
}
@Override
- public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
- IndexException {
+ public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested)
+ throws HyracksDataException, IndexException {
// This merge policy will only look at primary indexes in order to evaluate if a merge operation is needed. If it decides that
// a merge operation is needed, then it will merge *all* the indexes that belong to the dataset. The criteria to decide if a merge
// is needed is the same as the one that is used in the prefix merge policy:
@@ -113,8 +113,8 @@
// Reverse the components order back to its original order
Collections.reverse(mergableComponents);
- ILSMIndexAccessor accessor = lsmIndex.createAccessor(
- NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents);
}
break;
@@ -127,4 +127,10 @@
maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size"));
maxToleranceComponentCount = Integer.parseInt(properties.get("max-tolerance-component-count"));
}
+
+ @Override
+ public boolean isMergeLagging(ILSMIndex index) {
+ //TODO implement properly according to the merge policy
+ return false;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
index 841117b..c64fe63 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
@@ -25,8 +25,35 @@
import org.apache.hyracks.storage.am.common.api.IndexException;
public interface ILSMMergePolicy {
- public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
- IndexException;
+ public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested)
+ throws HyracksDataException, IndexException;
public void configure(Map<String, String> properties);
+
+ /**
+ * This method is used for flush-operation flow control:
+ * When the space occupancy of the in-memory component exceeds a specified memory threshold,
+ * entries are flushed to disk. As entries accumulate on disk, the entries are periodically
+ * merged together subject to a merge policy that decides when and what to merge. A merge
+ * policy may impose a certain constraint such as a maximum number of mergable(merge-able)
+ * disk components in order to provide a reasonable query response time. Otherwise, the query
+ * response time gets slower as the number of disk components increases.
+ * In order to avoid such an unexpected situation according to the merge policy, a way to
+ * control the number of disk components is provided by introducing a new method,
+ * isMegeLagging() in ILSMMergePolicy interface. When flushing an in-memory component is completed,
+ * the provided isMergeLagging() method is called to decide whether the memory budget for the
+ * current flushed in-memory component should be available for the incoming updated(inserted/deleted/updated)
+ * entries or not. If the method returns true, i.e., the merge operation is lagged according to
+ * the merge policy, the memory budget will not be made available for the incoming entries by
+ * making the current flush operation thread wait until (ongoing) merge operation finishes.
+ * Therefore, this will effectively prevent the number of disk components from exceeding
+ * a threshold of the allowed number of disk components.
+ *
+ * @param index
+ * @return true if merge operation is lagged according to the implemented merge policy,
+ * false otherwise.
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
+ public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException, IndexException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index 93d6fd4..aad5bf47 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -35,20 +35,20 @@
private int numComponents;
@Override
- public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
- IndexException {
+ public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested)
+ throws HyracksDataException, IndexException {
List<ILSMComponent> immutableComponents = index.getImmutableComponents();
- for (ILSMComponent c : immutableComponents) {
- if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
- return;
- }
+
+ if (!areComponentsMergable(immutableComponents)) {
+ return;
}
+
if (fullMergeIsRequested) {
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
accessor.scheduleFullMerge(index.getIOOperationCallback());
} else if (immutableComponents.size() >= numComponents) {
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents);
}
@@ -58,4 +58,89 @@
public void configure(Map<String, String> properties) {
numComponents = Integer.parseInt(properties.get("num-components"));
}
+
+ @Override
+ public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException, IndexException {
+ // see PrefixMergePolicy.isMergeLagging() for the rationale behind this code.
+
+ /**
+ * case 1.
+ * if totalImmutableCommponentCount < threshold,
+ * merge operation is not lagged ==> return false.
+ * case 2.
+ * if a) totalImmutableCommponentCount >= threshold && b) there is an ongoing merge,
+ * merge operation is lagged. ==> return true.
+ * case 3. *SPECIAL CASE*
+ * if a) totalImmutableCommponentCount >= threshold && b) there is *NO* ongoing merge,
+ * merge operation is lagged. ==> *schedule a merge operation* and then return true.
+ * This is a special case that requires to schedule a merge operation.
+ * Otherwise, all flush operations will be hung.
+ * This case can happen in a following situation:
+ * The system may crash when
+ * condition 1) the mergableImmutableCommponentCount >= threshold and
+ * condition 2) merge operation is going on.
+ * After the system is recovered, still condition 1) is true.
+ * If there are flush operations in the same dataset partition after the recovery,
+ * all these flush operations may not proceed since there is no ongoing merge and
+ * there will be no new merge either in this situation.
+ */
+
+ List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+ int totalImmutableComponentCount = immutableComponents.size();
+
+ // [case 1]
+ if (totalImmutableComponentCount < numComponents) {
+ return false;
+ }
+
+ boolean isMergeOngoing = isMergeOngoing(immutableComponents);
+
+ // here, implicitly (totalImmutableComponentCount >= numComponents) is true by passing case 1.
+ if (isMergeOngoing) {
+ // [case 2]
+ return true;
+ } else {
+ // [case 3]
+ // schedule a merge operation after making sure that all components are mergable
+ if (!areComponentsMergable(immutableComponents)) {
+ throw new IllegalStateException();
+ }
+ ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents);
+ return true;
+ }
+ }
+
+ /**
+ * checks whether all given components are mergable or not
+ *
+ * @param immutableComponents
+ * @return true if all components are mergable, false otherwise.
+ */
+ private boolean areComponentsMergable(List<ILSMComponent> immutableComponents) {
+ for (ILSMComponent c : immutableComponents) {
+ if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * This method returns whether there is an ongoing merge operation or not by checking
+ * each component state of given components.
+ *
+ * @param immutableComponents
+ * @return true if there is an ongoing merge operation, false otherwise.
+ */
+ private boolean isMergeOngoing(List<ILSMComponent> immutableComponents) {
+ int size = immutableComponents.size();
+ for (int i = 0; i < size; i++) {
+ if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index a19532f..b58cc29 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -200,6 +200,25 @@
try {
synchronized (opTracker) {
try {
+
+ /**
+ * [flow control]
+ * If merge operations are lagged according to the merge policy,
+ * flushing in-memory components are hold until the merge operation catches up.
+ * See PrefixMergePolicy.isMergeLagging() for more details.
+ */
+ if (opType == LSMOperationType.FLUSH) {
+ while (mergePolicy.isMergeLagging(lsmIndex)) {
+ try {
+ opTracker.wait();
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ }
+ } else if (opType == LSMOperationType.MERGE) {
+ opTracker.notifyAll();
+ }
+
int i = 0;
// First check if there is any action that is needed to be taken based on the state of each component.
for (ILSMComponent c : ctx.getComponentHolder()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
index 39ab815..86be9c8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
@@ -28,8 +28,8 @@
public class NoMergePolicy implements ILSMMergePolicy {
@Override
- public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
- IndexException {
+ public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested)
+ throws HyracksDataException, IndexException {
// Do nothing
}
@@ -37,4 +37,9 @@
public void configure(Map<String, String> properties) {
// Do nothing
}
+
+ @Override
+ public boolean isMergeLagging(ILSMIndex index) {
+ return false;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
index 5b8da53..36cb958 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
@@ -39,8 +39,188 @@
private int maxToleranceComponentCount;
@Override
- public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
- IndexException {
+ public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested)
+ throws HyracksDataException, IndexException {
+
+ List<ILSMComponent> immutableComponents = new ArrayList<ILSMComponent>(index.getImmutableComponents());
+
+ if (!areComponentsReadableWritableState(immutableComponents)) {
+ return;
+ }
+
+ if (fullMergeIsRequested) {
+ ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFullMerge(index.getIOOperationCallback());
+ return;
+ }
+
+ scheduleMerge(index);
+ }
+
+ @Override
+ public void configure(Map<String, String> properties) {
+ maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size"));
+ maxToleranceComponentCount = Integer.parseInt(properties.get("max-tolerance-component-count"));
+ }
+
+ @Override
+ public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException, IndexException {
+
+ /**
+ * [for flow-control purpose]
+ * when merge operations are lagged, threads which flushed components will be blocked
+ * until merge operations catch up, i.e, until the number of mergable immutable components <= maxToleranceComponentCount
+ * example:
+ * suppose that maxToleranceComponentCount = 3 and maxMergableComponentSize = 1GB
+ * The following shows a set of events occurred in time ti with a brief description.
+ * time
+ * t40: c32-1(1GB, RU) c38-33(192MB, RU) c39-39(32MB, RU) c40-40(32MB, RU)
+ * --> a thread which added c40-40 will trigger a merge including c38-33,c39-39,c40-40
+ * t41: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) c40-40(32MB, RUM) c41-41(32MB, RU)
+ * --> a thread which added c41-41 will not be blocked
+ * t42: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) c40-40(32MB, RUM) c41-41(32MB, RU) c42-42(32MB, RU)
+ * --> a thread which added c42-42 will not be blocked
+ * t43: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) c40-40(32MB, RUM) c41-41(32MB, RU) c42-42(32MB, RU) c43-43(32MB, RU)
+ * --> a thread which added c43-43 will not be blocked and will not trigger a merge since there is an ongoing merge triggered in t1.
+ * t44: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) c40-40(32MB, RUM) c41-41(32MB, RU) c42-42(32MB, RU) c43-43(32MB, RU) 'c44-44(32MB, RU)'
+ * --> a thread which will add c44-44 (even if the disk component is created, but not added to index instance disk components yet)
+ * will be blocked until the number of RU components < maxToleranceComponentCount
+ * t45: c32-1(1GB, RU) *c40-33(256MB, RU)* c41-41(32MB, RU) c42-42(32MB, RU) c43-43(32MB, RU) 'c44-44(32MB, RU)'
+ * --> a thread which completed the merge triggered in t1 added c40-33 and will go ahead and trigger the next merge with c40-33,c41-41,c42-42,c43-43.
+ * Still, the blocked thread will continue being blocked and the c44-44 was not included in the merge since it's not added yet.
+ * t46: c32-1(1GB, RU) c40-33(256MB, RUM) c41-41(32MB, RUM) c42-42(32MB, RUM) c43-43(32MB, RUM) c44-44(32MB, RUM)
+ * --> the merge triggered in t45 is going on and the merge unblocked the blocked thread, so c44-44 was added.
+ * t47: c32-1(1GB, RU) *c43-33(320MB, RU)* c44-44(32MB, RUM)
+ * --> a thread completed the merge triggered in t45 and added c43-33.
+ * t48: c32-1(1GB, RU) c43-33(320MB, RU) c44-44(32MB, RUM) c48-48(32MB, RU)
+ * --> a thread added c48-48 and will not be blocked and will trigger a merge with c43-44, c44-44, c48-48.
+ * ... continues ...
+ * ----------------------------------------
+ * legend:
+ * For example, C32-1 represents a disk component, more specifically, disk component name, where 32-1 represents a timestamp range from t1 to time t32.
+ * This means that the component C32-1 is a component resulting from a merge operation that merged components C1-1 to C32-32.
+ * This also implies that if two timestamps in a component name are equal, the component has not been merged yet after it was created.
+ * RU and RUM are possible state of disk components, where RU represents READABLE_UNWRITABLE and RUM represents READABLE_UNWRITABLE_MERGING.
+ * Now, c32-1(1GB, RU) represents a disk component resulted from merging c1-1 ~ c32-32 and the component size is 1GB.
+ * ----------------------------------------
+ * The flow control allows at most maxToleranceComponentCount mergable components,
+ * where the mergable components are disk components whose i) state == RU and ii) size < maxMergableComponentSize.
+ */
+
+ /**
+ * case 1.
+ * if mergableImmutableCommponentCount < threshold,
+ * merge operation is not lagged ==> return false.
+ * case 2.
+ * if a) mergableImmutableCommponentCount >= threshold && b) there is an ongoing merge,
+ * merge operation is lagged. ==> return true.
+ * case 3. *SPECIAL CASE*
+ * if a) mergableImmutableCommponentCount >= threshold && b) there is *NO* ongoing merge,
+ * merge operation is lagged. ==> *schedule a merge operation* and then return true.
+ * This is a special case that requires to schedule a merge operation.
+ * Otherwise, all flush operations will be hung.
+ * This case can happen in a following situation:
+ * The system may crash when
+ * condition 1) the mergableImmutableCommponentCount >= threshold and
+ * condition 2) merge operation is going on.
+ * After the system is recovered, still condition 1) is true.
+ * If there are flush operations in the same dataset partition after the recovery,
+ * all these flush operations may not proceed since there is no ongoing merge and
+ * there will be no new merge either in this situation.
+ */
+
+ List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+ int mergableImmutableComponentCount = getMergableImmutableComponentCount(immutableComponents);
+
+ // [case 1]
+ if (mergableImmutableComponentCount < maxToleranceComponentCount) {
+ return false;
+ }
+
+ boolean isMergeOngoing = isMergeOngoing(immutableComponents);
+
+ // here, implicitly (mergableImmutableComponentCount >= maxToleranceComponentCount) is true by passing case 1.
+ if (isMergeOngoing) {
+ // [case 2]
+ return true;
+ } else {
+ // [case 3]
+ // make sure that all components are of READABLE_UNWRITABLE state.
+ if (!areComponentsReadableWritableState(immutableComponents)) {
+ throw new IllegalStateException();
+ }
+ // schedule a merge operation
+ boolean isMergeTriggered = scheduleMerge(index);
+ if (!isMergeTriggered) {
+ throw new IllegalStateException();
+ }
+ return true;
+ }
+ }
+
+ /**
+ * This method returns whether there is an ongoing merge operation or not by checking
+ * each component state of given components.
+ *
+ * @param immutableComponents
+ * @return true if there is an ongoing merge operation, false otherwise.
+ */
+ private boolean isMergeOngoing(List<ILSMComponent> immutableComponents) {
+ int size = immutableComponents.size();
+ for (int i = 0; i < size; i++) {
+ if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * This method returns the number of mergable components among the given list
+ * of immutable components that are ordered from the latest component to order ones. A caller
+ * need to make sure the order in the list.
+ *
+ * @param immutableComponents
+ * @return the number of mergable component
+ */
+ private int getMergableImmutableComponentCount(List<ILSMComponent> immutableComponents) {
+ int count = 0;
+ for (ILSMComponent c : immutableComponents) {
+ long componentSize = ((AbstractDiskLSMComponent) c).getComponentSize();
+ //stop when the first non-mergable component is found.
+ if (c.getState() != ComponentState.READABLE_UNWRITABLE || componentSize > maxMergableComponentSize) {
+ break;
+ }
+ ++count;
+ }
+ return count;
+ }
+
+ /**
+ * checks whether all given components are of READABLE_UNWRITABLE state
+ *
+ * @param immutableComponents
+ * @return true if all components are of READABLE_UNWRITABLE state, false otherwise.
+ */
+ private boolean areComponentsReadableWritableState(List<ILSMComponent> immutableComponents) {
+ for (ILSMComponent c : immutableComponents) {
+ if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * schedule a merge operation according to this prefix merge policy
+ *
+ * @param index
+ * @return true if merge is scheduled, false otherwise.
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
+ private boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException, IndexException {
// 1. Look at the candidate components for merging in oldest-first order. If one exists, identify the prefix of the sequence of
// all such components for which the sum of their sizes exceeds MaxMrgCompSz. Schedule a merge of those components into a new component.
// 2. If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt. If so, schedule
@@ -49,17 +229,6 @@
// Reverse the components order so that we look at components from oldest to newest.
Collections.reverse(immutableComponents);
- for (ILSMComponent c : immutableComponents) {
- if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
- return;
- }
- }
- if (fullMergeIsRequested) {
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- accessor.scheduleFullMerge(index.getIOOperationCallback());
- return;
- }
long totalSize = 0;
int startIndex = -1;
for (int i = 0; i < immutableComponents.size(); i++) {
@@ -80,17 +249,13 @@
}
// Reverse the components order back to its original order
Collections.reverse(mergableComponents);
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents);
- break;
+ return true;
}
}
+ return false;
}
- @Override
- public void configure(Map<String, String> properties) {
- maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size"));
- maxToleranceComponentCount = Integer.parseInt(properties.get("max-tolerance-component-count"));
- }
}