Avoid always merging old components in prefix policy
Current, the prefix policy always looks at the components from
oldest to newest to schedule merge. One negative consequence is that
the oldest (largest) component gets merged over and over again
until it reaches the size limit. This is undesirable since it takes
O(n^2) disk IOs (n is the number of flushed components) to produce a
final component.
This patch is a temporary fix of this behavior, taken from the idea of
HBase compaction policy (https://www.ngdata.com/visualizing-hbase
-flushes-and-compactions/). The basic idea is that it introduces
some size factor (for now it's 1.2) to control the merge behavior.
When the prefix policy finds a sequence of components to merge,
we also check the oldest (largest) component in the sequence should
be smaller than 1.2*the total size of all younger components.
By doing so, we can avoid merging oldest components over and over again,
making the disk IOs O(nlog n).
Change-Id: I464da3fed38cded0aee7b319a35664eae069a2ba
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1818
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
index 23646b9..329e4fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -34,14 +35,24 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
public class PrefixMergePolicy implements ILSMMergePolicy {
-
private long maxMergableComponentSize;
private int maxToleranceComponentCount;
+ /**
+ * This parameter is used to avoid merging a big component with a sequence of small components.
+ * If a component is larger than ratio * all younger disk components in the merge list, then
+ * this old (large) component is ignored in this round.
+ * Since it's a temporary fix, we don't set this parameter as configurable in order not to
+ * disturb the users.
+ * This number is taken from HBase compaction policy
+ * see https://www.ngdata.com/visualizing-hbase-flushes-and-compactions/
+ */
+ private final static double MAX_MERGABLE_COMPONENT_SIZE_RATIO = 1.2;
+
@Override
public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException {
- ArrayList<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
+ List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
if (!areComponentsReadableWritableState(immutableComponents)) {
return;
@@ -129,7 +140,9 @@
* there will be no new merge either in this situation.
*/
- List<ILSMDiskComponent> immutableComponents = index.getImmutableComponents();
+ List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
+ // reverse the list so that we look from the oldest to the newest components
+ Collections.reverse(immutableComponents);
int mergableImmutableComponentCount = getMergableImmutableComponentCount(immutableComponents);
// [case 1]
@@ -177,23 +190,15 @@
/**
* This method returns the number of mergable components among the given list
- * of immutable components that are ordered from the latest component to order ones. A caller
+ * of immutable components that are ordered from the oldest component to newer ones. A caller
* need to make sure the order in the list.
*
* @param immutableComponents
* @return the number of mergable component
*/
private int getMergableImmutableComponentCount(List<ILSMDiskComponent> immutableComponents) {
- int count = 0;
- for (ILSMComponent c : immutableComponents) {
- long componentSize = ((ILSMDiskComponent) c).getComponentSize();
- //stop when the first non-mergable component is found.
- if (c.getState() != ComponentState.READABLE_UNWRITABLE || componentSize > maxMergableComponentSize) {
- break;
- }
- ++count;
- }
- return count;
+ Pair<Integer, Integer> mergableIndexes = getMergableComponentsIndex(immutableComponents);
+ return mergableIndexes == null ? 0 : mergableIndexes.getRight() - mergableIndexes.getLeft() + 1;
}
/**
@@ -220,41 +225,94 @@
* @throws IndexException
*/
private boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException {
- // 1. Look at the candidate components for merging in oldest-first order. If one exists, identify the prefix of the sequence of
- // all such components for which the sum of their sizes exceeds MaxMrgCompSz. Schedule a merge of those components into a new component.
- // 2. If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt. If so, schedule
- // a merge all of the current candidates into a new single component.
List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
// Reverse the components order so that we look at components from oldest to newest.
Collections.reverse(immutableComponents);
- long totalSize = 0;
- int startIndex = -1;
- for (int i = 0; i < immutableComponents.size(); i++) {
- ILSMComponent c = immutableComponents.get(i);
- long componentSize = ((ILSMDiskComponent) c).getComponentSize();
- if (componentSize > maxMergableComponentSize) {
- startIndex = i;
- totalSize = 0;
+ Pair<Integer, Integer> mergeableIndexes = getMergableComponentsIndex(immutableComponents);
+ if (mergeableIndexes != null) {
+ scheduleMerge(index, immutableComponents, mergeableIndexes.getLeft(), mergeableIndexes.getRight());
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void scheduleMerge(ILSMIndex index, List<ILSMDiskComponent> immutableComponents, int startIndex,
+ int endIndex) throws HyracksDataException {
+ List<ILSMDiskComponent> mergableComponents =
+ new ArrayList<>(immutableComponents.subList(startIndex, endIndex + 1));
+
+ // Reverse the components order back to its original order
+ Collections.reverse(mergableComponents);
+ ILSMIndexAccessor accessor =
+ index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents);
+ }
+
+ /**
+ * Given a list of disk components (ordered from oldest to newest), this function
+ * identify a sequence of components to be merged. It works logically as follows:
+ * 1. Look at the candidate components for merging in oldest-first order. If one exists, identify the
+ * prefix of the sequence of all such components for which the sum of their sizes exceeds MaxMrgCompSz.
+ * 2. If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds
+ * MaxTolCompCnt.
+ * 3. If we find a sequence from 1 or 2, and the first (oldest) component in the sequence is smaller than
+ * ratio * total size of the younger components in the sequence, schedule a merge of all sequences.
+ * Otherwise, go back to step 1 with the next component.
+ *
+ * @param immutableComponents
+ * @return a pair of indexes indicating the start and end position of the sequence
+ * otherwise, return null if no sequence is found
+ */
+ private Pair<Integer, Integer> getMergableComponentsIndex(List<ILSMDiskComponent> immutableComponents) {
+ int numComponents = immutableComponents.size();
+ for (int i = 0; i < numComponents; i++) {
+ if (immutableComponents.get(i).getComponentSize() > maxMergableComponentSize
+ || immutableComponents.get(i).getState() != ComponentState.READABLE_UNWRITABLE) {
continue;
}
- totalSize += componentSize;
- boolean isLastComponent = i + 1 == immutableComponents.size() ? true : false;
- if (totalSize > maxMergableComponentSize
- || (isLastComponent && i - startIndex >= maxToleranceComponentCount)) {
- List<ILSMDiskComponent> mergableComponents = new ArrayList<>();
- for (int j = startIndex + 1; j <= i; j++) {
- mergableComponents.add(immutableComponents.get(j));
+ long startComponentSize = immutableComponents.get(i).getComponentSize();
+
+ long totalSize = startComponentSize;
+ int j = i + 1;
+ boolean mergeable = true;
+ for (; j < numComponents; j++) {
+ long componentSize = immutableComponents.get(j).getComponentSize();
+ if (componentSize > maxMergableComponentSize
+ || immutableComponents.get(j).getState() != ComponentState.READABLE_UNWRITABLE) {
+ // skip unmergeable components if any
+ break;
}
- // Reverse the components order back to its original order
- Collections.reverse(mergableComponents);
- ILSMIndexAccessor accessor =
- index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents);
- return true;
+ totalSize += componentSize;
+ mergeable = startComponentSize < MAX_MERGABLE_COMPONENT_SIZE_RATIO * (totalSize - startComponentSize);
+ if (totalSize > maxMergableComponentSize && mergeable) {
+ // If the ratio check passes and total size exceeds the threshold, we return this sequence
+ return Pair.of(i, j);
+ }
+ // Stops search if the ratio threshold cannot be met.
+ // Since components are ordered from older to newer, newer (later) components
+ // would have smaller sizes than the current component size.
+ if (startComponentSize >= MAX_MERGABLE_COMPONENT_SIZE_RATIO
+ * (totalSize + componentSize * (numComponents - j - 1) - startComponentSize)) {
+ break;
+ }
+ }
+ if (j != numComponents) {
+ continue;
+ }
+ if (numComponents - i >= maxToleranceComponentCount && mergeable) {
+ // If it's the last component, component count exceeds the threshold, the ratio check passes,
+ // then we return this sequence.
+ return Pair.of(i, numComponents - 1);
+ }
+ // if we reach the last component, but there are not enough components to merge,
+ // then we don't need to check i+1 th component
+ if (numComponents - i < maxToleranceComponentCount) {
+ return null;
}
}
- return false;
+ return null;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/pom.xml b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/pom.xml
index 9ce4237..06907d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/pom.xml
@@ -88,5 +88,11 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>2.0.2-beta</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java
new file mode 100644
index 0000000..04dc736
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/PrefixMergePolicyTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.storage.common.ISearchOperationCallback;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import junit.framework.TestCase;
+
+public class PrefixMergePolicyTest extends TestCase {
+
+ private final int MAX_COMPONENT_SIZE = 100;
+
+ private final int MAX_COMPONENT_COUNT = 3;
+
+ public void testBasic() {
+ try {
+ List<Long> sizes = new ArrayList<>(Arrays.asList(1L, 2L, 3L));
+ List<Long> resultSizes = new ArrayList<>();
+ ILSMIndex index = mockIndex(sizes, resultSizes);
+ ILSMMergePolicy policy = mockMergePolicy();
+ policy.diskComponentAdded(index, false);
+ assertEquals(3, resultSizes.size());
+ assertEquals(Arrays.asList(1L, 2L, 3L), resultSizes);
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSmallComponents() {
+ try {
+ List<Long> sizes = new ArrayList<>(Arrays.asList(1L, 2L, 3L, 4L, 5L));
+ List<Long> resultSizes = new ArrayList<>();
+ ILSMIndex index = mockIndex(sizes, resultSizes);
+ ILSMMergePolicy policy = mockMergePolicy();
+ policy.diskComponentAdded(index, false);
+ assertEquals(5, resultSizes.size());
+ assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L), resultSizes);
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSkipComponents() {
+ try {
+ List<Long> sizes = new ArrayList<>(Arrays.asList(1L, 2L, 3L, 4L, 101L));
+ List<Long> resultSizes = new ArrayList<>();
+ ILSMIndex index = mockIndex(sizes, resultSizes);
+ ILSMMergePolicy policy = mockMergePolicy();
+ policy.diskComponentAdded(index, false);
+ assertEquals(4, resultSizes.size());
+ assertEquals(Arrays.asList(1L, 2L, 3L, 4L), resultSizes);
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSkipLargeComponents() {
+ try {
+ List<Long> sizes = new ArrayList<>(Arrays.asList(1L, 2L, 3L, 4L, 50L));
+ List<Long> resultSizes = new ArrayList<>();
+ ILSMIndex index = mockIndex(sizes, resultSizes);
+ ILSMMergePolicy policy = mockMergePolicy();
+ policy.diskComponentAdded(index, false);
+ assertEquals(4, resultSizes.size());
+ assertEquals(Arrays.asList(1L, 2L, 3L, 4L), resultSizes);
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMergeLargeComponents() {
+ try {
+ List<Long> sizes = new ArrayList<>(Arrays.asList(3L, 3L, 45L, 50L));
+ List<Long> resultSizes = new ArrayList<>();
+ ILSMIndex index = mockIndex(sizes, resultSizes);
+ ILSMMergePolicy policy = mockMergePolicy();
+ policy.diskComponentAdded(index, false);
+ assertEquals(4, resultSizes.size());
+ assertEquals(Arrays.asList(3L, 3L, 45L, 50L), resultSizes);
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testAvoidMergeLargeComponents() {
+ try {
+ // in this case, we can
+ List<Long> sizes = new ArrayList<>(Arrays.asList(28L, 29L, 30L, 90L));
+ List<Long> resultSizes = new ArrayList<>();
+ ILSMIndex index = mockIndex(sizes, resultSizes);
+ ILSMMergePolicy policy = mockMergePolicy();
+ policy.diskComponentAdded(index, false);
+ //assertEquals(3, resultSizes.size());
+ assertEquals(Arrays.asList(28L, 29L, 30L, 90L), resultSizes);
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPerformance() {
+ try {
+ List<Long> sizes = new ArrayList<>();
+ ILSMMergePolicy policy = mockMergePolicy();
+ int maxNumComponents = 0;
+ int sumComponents = 0;
+ Set<Long> writeAmplifications = new HashSet<>();
+ int pass = 0;
+ do {
+ pass++;
+ sizes.add(0, 1L);
+ ILSMIndex index = mockIndex(sizes, null);
+ policy.diskComponentAdded(index, false);
+ int length = sizes.size();
+ maxNumComponents = maxNumComponents >= length ? maxNumComponents : length;
+ sumComponents += length;
+ writeAmplifications.add(sizes.get(sizes.size() - 1));
+ } while (sizes.get(sizes.size() - 1) < MAX_COMPONENT_SIZE);
+ writeAmplifications.remove(1L);
+ Assert.assertTrue(maxNumComponents <= 6);
+
+ //average disk components per pass
+ Assert.assertTrue(sumComponents / pass <= 3);
+
+ Assert.assertTrue(writeAmplifications.size() <= 7);
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ private ILSMMergePolicy mockMergePolicy() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("max-mergable-component-size", String.valueOf(MAX_COMPONENT_SIZE));
+ properties.put("max-tolerance-component-count", String.valueOf(MAX_COMPONENT_COUNT));
+ ILSMMergePolicy policy = new PrefixMergePolicy();
+ policy.configure(properties);
+ return policy;
+ }
+
+ private ILSMIndex mockIndex(List<Long> componentSizes, List<Long> mergedSizes) throws HyracksDataException {
+ List<ILSMDiskComponent> components = new ArrayList<>();
+ for (Long size : componentSizes) {
+ ILSMDiskComponent component = Mockito.mock(ILSMDiskComponent.class);
+ Mockito.when(component.getComponentSize()).thenReturn(size);
+ Mockito.when(component.getState()).thenReturn(ComponentState.READABLE_UNWRITABLE);
+ components.add(component);
+ }
+
+ ILSMIndex index = Mockito.mock(ILSMIndex.class);
+ Mockito.when(index.getImmutableComponents()).thenReturn(components);
+
+ ILSMIndexAccessor accessor = Mockito.mock(ILSMIndexAccessor.class);
+
+ Mockito.doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ List<ILSMDiskComponent> mergedComponents = invocation.getArgumentAt(1, List.class);
+ if (mergedSizes != null) {
+ mergedComponents.forEach(component -> {
+ mergedSizes.add(component.getComponentSize());
+ });
+ }
+ long sum = 0;
+ for (ILSMDiskComponent c : mergedComponents) {
+ sum += c.getComponentSize();
+ }
+ ILSMDiskComponent component = Mockito.mock(ILSMDiskComponent.class);
+ Mockito.when(component.getComponentSize()).thenReturn(sum);
+ Mockito.when(component.getState()).thenReturn(ComponentState.READABLE_UNWRITABLE);
+ int swapIndex = components.indexOf(mergedComponents.get(0));
+ components.removeAll(mergedComponents);
+ components.add(swapIndex, component);
+ componentSizes.clear();
+ for (ILSMDiskComponent c : components) {
+ componentSizes.add(c.getComponentSize());
+ }
+ return null;
+ }
+ }).when(accessor).scheduleMerge(Mockito.any(ILSMIOOperationCallback.class),
+ Mockito.anyListOf(ILSMDiskComponent.class));
+
+ Mockito.when(index.createAccessor(Mockito.any(IModificationOperationCallback.class),
+ Mockito.any(ISearchOperationCallback.class))).thenReturn(accessor);
+
+ return index;
+ }
+
+}