[ASTERIXDB-2104][STO] Optimization for Correlated Policy
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Previously, we introduced an optimization to the prefix merge policy
by ensuring component size grows exponentially after merging. This patch
applies the same optimization to CorrelatedMergePolicy
- A little refactoring of PrefixMergePolicy and CorrelatedMergePolicy
for code reuse
Change-Id: Icd84c77f1e2c34c410508fbc9de70156224ce932
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2019
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index d28b991..5bb9d49 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -22,26 +22,20 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy;
-public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
-
- private long maxMergableComponentSize;
- private int maxToleranceComponentCount;
+public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy {
private final IDatasetLifecycleManager datasetLifecycleManager;
private final int datasetId;
@@ -61,37 +55,9 @@
// 2. If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt. If so, schedule
// a merge all of the current candidates into a new single component.
- if (fullMergeIsRequested) {
- //full merge request is handled by each index separately, since it is possible that
- //when a primary index wants to send full merge requests for all secondaries,
- //one secondary index is being merged and the request cannot be scheduled
- List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
- if (!areComponentsReadableUnwritableState(immutableComponents)) {
- return;
- }
-
- ILSMIndexAccessor accessor =
- index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- accessor.scheduleFullMerge(index.getIOOperationCallback());
- return;
+ if (fullMergeIsRequested || index.isPrimaryIndex()) {
+ super.diskComponentAdded(index, fullMergeIsRequested);
}
-
- if (!index.isPrimaryIndex()) {
- return;
- }
- List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
- if (!areComponentsReadableUnwritableState(immutableComponents)) {
- return;
- }
- scheduleMerge(index);
- }
-
- @Override
- public void configure(Map<String, String> properties) {
- maxMergableComponentSize =
- Long.parseLong(properties.get(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_SIZE));
- maxToleranceComponentCount =
- Integer.parseInt(properties.get(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_COUNT));
}
/**
@@ -101,106 +67,33 @@
*/
@Override
public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException {
- /**
- * case 1.
- * if mergableImmutableCommponentCount < threshold,
- * merge operation is not lagged ==> return false.
- * case 2.
- * if a) mergableImmutableCommponentCount >= threshold && b) there is an ongoing merge,
- * merge operation is lagged. ==> return true.
- * case 3. *SPECIAL CASE*
- * if a) mergableImmutableCommponentCount >= threshold && b) there is *NO* ongoing merge,
- * merge operation is lagged. ==> *schedule a merge operation* and then return true.
- * This is a special case that requires to schedule a merge operation.
- * Otherwise, all flush operations will be hung.
- * This case can happen in a following situation:
- * The system may crash when
- * condition 1) the mergableImmutableCommponentCount >= threshold and
- * condition 2) merge operation is going on.
- * After the system is recovered, still condition 1) is true.
- * If there are flush operations in the same dataset partition after the recovery,
- * all these flush operations may not proceed since there is no ongoing merge and
- * there will be no new merge either in this situation.
- * Note for case 3, we only let the primary index to schedule merge operations on behalf
- * of all indexes.
- */
-
- List<ILSMDiskComponent> immutableComponents = index.getImmutableComponents();
- int mergableImmutableComponentCount = getMergableImmutableComponentCount(immutableComponents);
-
- // [case 1]
- if (mergableImmutableComponentCount < maxToleranceComponentCount) {
- return false;
- }
-
- boolean isMergeOngoing = isMergeOngoing(immutableComponents);
-
- if (isMergeOngoing) {
- // [case 2]
- return true;
- }
-
if (index.isPrimaryIndex()) {
- // [case 3]
- // make sure that all components are of READABLE_UNWRITABLE state.
- if (!areComponentsReadableUnwritableState(immutableComponents)) {
- throw new IllegalStateException();
- }
- // schedule a merge operation
- boolean isMergeTriggered = scheduleMerge(index);
- if (!isMergeTriggered) {
- throw new IllegalStateException();
- }
- return true;
+ return super.isMergeLagging(index);
} else {
- //[case 3]
- //if the index is secondary then ignore the merge request (since the merge should be
- //triggered by the primary) and here we simply treat it as not lagged.
return false;
}
+
}
- private boolean scheduleMerge(ILSMIndex index) throws HyracksDataException {
+ @Override
+ protected boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException {
List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
+ // Reverse the components order so that we look at components from oldest to newest.
Collections.reverse(immutableComponents);
- long totalSize = 0;
- int startIndex = -1;
-
- int numComponents = immutableComponents.size();
-
- for (int i = 0; i < numComponents; i++) {
- ILSMComponent c = immutableComponents.get(i);
- long componentSize = ((ILSMDiskComponent) c).getComponentSize();
- if (componentSize > maxMergableComponentSize || ((ILSMDiskComponent) c).getComponentId().notFound()) {
- startIndex = i;
- totalSize = 0;
- continue;
- }
- totalSize += componentSize;
- boolean isLastComponent = i + 1 == numComponents ? true : false;
- if (totalSize > maxMergableComponentSize
- || (isLastComponent && i - startIndex >= maxToleranceComponentCount)) {
- //merge disk components from startIndex+1 to i
- long minID = Long.MAX_VALUE;
- long maxID = Long.MIN_VALUE;
- for (int j = startIndex + 1; j <= i; j++) {
- ILSMDiskComponentId id = immutableComponents.get(j).getComponentId();
- if (minID > id.getMinId()) {
- minID = id.getMinId();
- }
- if (maxID < id.getMaxId()) {
- maxID = id.getMaxId();
- }
- }
- Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos();
- int partition = getIndexPartition(index, indexInfos);
- triggerScheduledMerge(minID, maxID, indexInfos.stream().filter(info -> info.getPartition() == partition)
- .collect(Collectors.toSet()));
- return true;
- }
+ Pair<Integer, Integer> mergeableIndexes = getMergableComponentsIndex(immutableComponents);
+ if (mergeableIndexes == null) {
+ //nothing to merge
+ return false;
}
- return false;
+ long minID = immutableComponents.get(mergeableIndexes.getLeft()).getComponentId().getMinId();
+ long maxID = immutableComponents.get(mergeableIndexes.getRight()).getComponentId().getMaxId();
+
+ Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos();
+ int partition = getIndexPartition(index, indexInfos);
+ triggerScheduledMerge(minID, maxID,
+ indexInfos.stream().filter(info -> info.getPartition() == partition).collect(Collectors.toSet()));
+ return true;
}
/**
@@ -224,15 +117,13 @@
List<ILSMDiskComponent> mergableComponents = new ArrayList<>();
for (ILSMDiskComponent component : immutableComponents) {
ILSMDiskComponentId id = component.getComponentId();
- if (!id.notFound()) {
- if (id.getMinId() >= minID && id.getMaxId() <= maxID) {
- mergableComponents.add(component);
- }
- if (id.getMaxId() < minID) {
- //disk components are ordered from latest (with largest IDs) to oldest (with smallest IDs)
- //if the component.maxID < minID, we can safely skip the rest disk components in the list
- break;
- }
+ if (id.getMinId() >= minID && id.getMaxId() <= maxID) {
+ mergableComponents.add(component);
+ }
+ if (id.getMaxId() < minID) {
+ //disk components are ordered from latest (with largest IDs) to oldest (with smallest IDs)
+ //if the component.maxID < minID, we can safely skip the rest disk components in the list
+ break;
}
}
ILSMIndexAccessor accessor =
@@ -241,62 +132,6 @@
}
}
- /**
- * This method returns the number of mergable components among the given list
- * of immutable components that are ordered from the latest component to order ones. A caller
- * need to make sure the order in the list.
- *
- * @param immutableComponents
- * @return the number of mergable component
- * @throws HyracksDataException
- */
- private int getMergableImmutableComponentCount(List<ILSMDiskComponent> immutableComponents)
- throws HyracksDataException {
- int count = 0;
- for (ILSMComponent c : immutableComponents) {
- long componentSize = ((ILSMDiskComponent) c).getComponentSize();
- //stop when the first non-mergable component is found.
- if (c.getState() != ComponentState.READABLE_UNWRITABLE || componentSize > maxMergableComponentSize
- || ((ILSMDiskComponent) c).getComponentId().notFound()) {
- break;
- }
- ++count;
- }
- return count;
- }
-
- /**
- * This method returns whether there is an ongoing merge operation or not by checking
- * each component state of given components.
- *
- * @param immutableComponents
- * @return true if there is an ongoing merge operation, false otherwise.
- */
- private boolean isMergeOngoing(List<ILSMDiskComponent> immutableComponents) {
- int size = immutableComponents.size();
- for (int i = 0; i < size; i++) {
- if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING) {
- return true;
- }
- }
- return false;
- }
-
- /**
- * checks whether all given components are of READABLE_UNWRITABLE state
- *
- * @param immutableComponents
- * @return true if all components are of READABLE_UNWRITABLE state, false otherwise.
- */
- private boolean areComponentsReadableUnwritableState(List<ILSMDiskComponent> immutableComponents) {
- for (ILSMComponent c : immutableComponents) {
- if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
- return false;
- }
- }
- return true;
- }
-
private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) {
for (IndexInfo info : indexInfos) {
if (info.getIndex() == index) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
index 3c141bc..25242d4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
@@ -19,27 +19,19 @@
package org.apache.asterix.common.context;
-import java.util.Arrays;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicyFactory;
-public class CorrelatedPrefixMergePolicyFactory implements ILSMMergePolicyFactory {
+public class CorrelatedPrefixMergePolicyFactory extends PrefixMergePolicyFactory {
private static final long serialVersionUID = 1L;
public static final String NAME = "correlated-prefix";
public static final String KEY_DATASET_ID = "datasetId";
- public static final String KEY_MAX_COMPONENT_SIZE = "max-mergable-component-size";
- public static final String KEY_MAX_COMPONENT_COUNT = "max-tolerance-component-count";
-
- private static final String[] SET_VALUES = new String[] { KEY_MAX_COMPONENT_SIZE, KEY_MAX_COMPONENT_COUNT };
- private static final Set<String> PROPERTIES_NAMES = new HashSet<>(Arrays.asList(SET_VALUES));
@Override
public String getName() {
@@ -47,11 +39,6 @@
}
@Override
- public Set<String> getPropertiesNames() {
- return PROPERTIES_NAMES;
- }
-
- @Override
public ILSMMergePolicy createMergePolicy(Map<String, String> configuration, INCServiceContext ctx) {
IDatasetLifecycleManager dslcManager =
((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
index e36f4a8..c18ecc2 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -29,7 +29,6 @@
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.context.CorrelatedPrefixMergePolicy;
-import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
import org.apache.asterix.common.context.DatasetInfo;
import org.apache.asterix.common.context.IndexInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -154,69 +153,6 @@
}
@Test
- public void testPrimaryNotFound() {
- try {
- List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
- new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
- new LSMDiskComponentId(ILSMDiskComponentId.NOT_FOUND, ILSMDiskComponentId.NOT_FOUND),
- new LSMDiskComponentId(10, 19));
- List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
- IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0);
-
- List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35),
- new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24));
- List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
- IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0);
-
- ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
-
- policy.diskComponentAdded(secondary.getIndex(), false);
- Assert.assertTrue(resultPrimaryIDs.isEmpty());
- Assert.assertTrue(resultSecondaryIDs.isEmpty());
-
- policy.diskComponentAdded(primary.getIndex(), false);
- Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(40, 50), new LSMDiskComponentId(30, 35),
- new LSMDiskComponentId(25, 29)), resultPrimaryIDs);
- Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29)),
- resultSecondaryIDs);
- } catch (HyracksDataException e) {
- Assert.fail(e.getMessage());
- }
- }
-
- @Test
- public void testSecondaryNotFound() {
- try {
- List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
- new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24),
- new LSMDiskComponentId(10, 19));
- List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
- IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0);
-
- List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35),
- new LSMDiskComponentId(ILSMDiskComponentId.NOT_FOUND, ILSMDiskComponentId.NOT_FOUND),
- new LSMDiskComponentId(20, 24));
- List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
- IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0);
-
- ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
-
- policy.diskComponentAdded(secondary.getIndex(), false);
- Assert.assertTrue(resultPrimaryIDs.isEmpty());
- Assert.assertTrue(resultSecondaryIDs.isEmpty());
-
- policy.diskComponentAdded(primary.getIndex(), false);
- Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
- new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs);
- Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(20, 24)),
- resultSecondaryIDs);
-
- } catch (HyracksDataException e) {
- Assert.fail(e.getMessage());
- }
- }
-
- @Test
public void testMultiPartition() {
try {
List<ILSMDiskComponentId> componentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
@@ -251,8 +187,8 @@
private ILSMMergePolicy mockMergePolicy(IndexInfo... indexes) {
Map<String, String> properties = new HashMap<>();
- properties.put(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_COUNT, String.valueOf(MAX_COMPONENT_COUNT));
- properties.put(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_SIZE, String.valueOf(MAX_COMPONENT_SIZE));
+ properties.put("max-tolerance-component-count", String.valueOf(MAX_COMPONENT_COUNT));
+ properties.put("max-mergable-component-size", String.valueOf(MAX_COMPONENT_SIZE));
Set<IndexInfo> indexInfos = new HashSet<>();
for (IndexInfo info : indexes) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
index 329e4fb..6878910 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
@@ -35,8 +35,8 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
public class PrefixMergePolicy implements ILSMMergePolicy {
- private long maxMergableComponentSize;
- private int maxToleranceComponentCount;
+ protected long maxMergableComponentSize;
+ protected int maxToleranceComponentCount;
/**
* This parameter is used to avoid merging a big component with a sequence of small components.
@@ -178,7 +178,7 @@
* @param immutableComponents
* @return true if there is an ongoing merge operation, false otherwise.
*/
- private boolean isMergeOngoing(List<ILSMDiskComponent> immutableComponents) {
+ protected boolean isMergeOngoing(List<ILSMDiskComponent> immutableComponents) {
int size = immutableComponents.size();
for (int i = 0; i < size; i++) {
if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING) {
@@ -196,7 +196,7 @@
* @param immutableComponents
* @return the number of mergable component
*/
- private int getMergableImmutableComponentCount(List<ILSMDiskComponent> immutableComponents) {
+ protected int getMergableImmutableComponentCount(List<ILSMDiskComponent> immutableComponents) {
Pair<Integer, Integer> mergableIndexes = getMergableComponentsIndex(immutableComponents);
return mergableIndexes == null ? 0 : mergableIndexes.getRight() - mergableIndexes.getLeft() + 1;
}
@@ -207,7 +207,7 @@
* @param immutableComponents
* @return true if all components are of READABLE_UNWRITABLE state, false otherwise.
*/
- private boolean areComponentsReadableWritableState(List<ILSMDiskComponent> immutableComponents) {
+ protected boolean areComponentsReadableWritableState(List<ILSMDiskComponent> immutableComponents) {
for (ILSMComponent c : immutableComponents) {
if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
return false;
@@ -224,21 +224,21 @@
* @throws HyracksDataException
* @throws IndexException
*/
- private boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException {
+ protected boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException {
List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
// Reverse the components order so that we look at components from oldest to newest.
Collections.reverse(immutableComponents);
Pair<Integer, Integer> mergeableIndexes = getMergableComponentsIndex(immutableComponents);
if (mergeableIndexes != null) {
- scheduleMerge(index, immutableComponents, mergeableIndexes.getLeft(), mergeableIndexes.getRight());
+ triggerScheduleMerge(index, immutableComponents, mergeableIndexes.getLeft(), mergeableIndexes.getRight());
return true;
} else {
return false;
}
}
- private void scheduleMerge(ILSMIndex index, List<ILSMDiskComponent> immutableComponents, int startIndex,
+ private void triggerScheduleMerge(ILSMIndex index, List<ILSMDiskComponent> immutableComponents, int startIndex,
int endIndex) throws HyracksDataException {
List<ILSMDiskComponent> mergableComponents =
new ArrayList<>(immutableComponents.subList(startIndex, endIndex + 1));
@@ -265,7 +265,7 @@
* @return a pair of indexes indicating the start and end position of the sequence
* otherwise, return null if no sequence is found
*/
- private Pair<Integer, Integer> getMergableComponentsIndex(List<ILSMDiskComponent> immutableComponents) {
+ protected Pair<Integer, Integer> getMergableComponentsIndex(List<ILSMDiskComponent> immutableComponents) {
int numComponents = immutableComponents.size();
for (int i = 0; i < numComponents; i++) {
if (immutableComponents.get(i).getComponentSize() > maxMergableComponentSize