[ASTERIXDB-2453] Add Improved Constant Merge Policy
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- The current constant merge policy is unsuable because of its high
merge cost, i.e., O(N*N) where N is the number of flushes. This patch
replaces the previous constant merge policy with a more efficient policy
that still enforces a maximum number of components but greatly lowers
the merge cost.
- Extend AbstractLSMIndex with a method to return the total number of
flushes, based on the file name sequencer. This is required by the new
policy.
Change-Id: Ie5f83a4d5fdd3f036b823c906df1760f5110ae0a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2971
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 9199fbb..d3133ce 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -20,12 +20,14 @@
package org.apache.hyracks.storage.am.lsm.common.impls;
import java.io.IOException;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -901,4 +903,17 @@
protected abstract ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException;
protected abstract ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException;
+
+ public Optional<Long> getLatestDiskComponentSequence() {
+ if (diskComponents.isEmpty()) {
+ return Optional.empty();
+ }
+ final ILSMDiskComponent latestDiskComponent = diskComponents.get(0);
+ final Set<String> diskComponentPhysicalFiles = latestDiskComponent.getLSMComponentPhysicalFiles();
+ final String fileName = diskComponentPhysicalFiles.stream().findAny()
+ .orElseThrow(() -> new IllegalStateException("Disk component without any physical files"));
+ return Optional
+ .of(IndexComponentFileReference.of(Paths.get(fileName).getFileName().toString()).getSequenceEnd());
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index c642d82..5b71770 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -19,41 +19,120 @@
package org.apache.hyracks.storage.am.lsm.common.impls;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import org.apache.hyracks.storage.common.IIndexAccessParameters;
public class ConstantMergePolicy implements ILSMMergePolicy {
private int numComponents;
+ private int[][] binomial;
+
@Override
public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException {
- List<ILSMDiskComponent> immutableComponents = index.getDiskComponents();
-
- if (!areComponentsMergable(immutableComponents)) {
+ List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getDiskComponents());
+ if (!areComponentsReadableWritableState(immutableComponents)) {
return;
}
-
if (fullMergeIsRequested) {
- IIndexAccessParameters iap =
- new IndexAccessParameters(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- ILSMIndexAccessor accessor = index.createAccessor(iap);
- accessor.scheduleFullMerge();
- } else if (immutableComponents.size() >= numComponents) {
ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- accessor.scheduleMerge(immutableComponents);
+ accessor.scheduleFullMerge();
+ return;
}
+ scheduleMerge(index);
+ }
+
+ private boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException {
+ Optional<Long> latestSeq = ((AbstractLSMIndex) index).getLatestDiskComponentSequence();
+ if (!latestSeq.isPresent()) {
+ return false;
+ }
+ // sequence number starts from 0, and thus latestSeq + 1 gives the number of flushes
+ int numFlushes = latestSeq.get().intValue() + 1;
+ List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getDiskComponents());
+ Collections.reverse(immutableComponents);
+ int size = immutableComponents.size();
+ int depth = 0;
+ while (treeDepth(depth) < numFlushes) {
+ depth++;
+ }
+ int mergedIndex =
+ binomialIndex(depth, Math.min(depth, numComponents) - 1, numFlushes - treeDepth(depth - 1) - 1);
+ if (mergedIndex == size - 1) {
+ return false;
+ }
+ long mergeSize = 0;
+ List<ILSMDiskComponent> mergableComponents = new ArrayList<ILSMDiskComponent>();
+ for (int i = mergedIndex; i < immutableComponents.size(); i++) {
+ mergeSize = mergeSize + immutableComponents.get(i).getComponentSize();
+ mergableComponents.add(immutableComponents.get(i));
+ }
+ Collections.reverse(mergableComponents);
+ ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ accessor.scheduleMerge(mergableComponents);
+ return true;
+ }
+
+ private int treeDepth(int d) {
+ if (d < 0) {
+ return 0;
+ }
+ return treeDepth(d - 1) + binomialChoose(d + Math.min(d, numComponents) - 1, d);
+ }
+
+ private int binomialIndex(int d, int h, int t) {
+ if (t < 0 || t > binomialChoose(d + h, h)) {
+ throw new IllegalStateException("Illegal binomial values");
+ }
+ if (t == 0) {
+ return 0;
+ } else if (t < binomialChoose(d + h - 1, h)) {
+ return binomialIndex(d - 1, h, t);
+ }
+ return binomialIndex(d, h - 1, t - binomialChoose(d + h - 1, h)) + 1;
+ }
+
+ private int binomialChoose(int n, int k) {
+ if (k < 0 || k > n) {
+ return 0;
+ }
+ if (k == 0 || k == n) {
+ return 1;
+ }
+ // For efficiency, binomial is persisted to avoid re-computations for every merge
+ if (binomial == null || binomial.length <= n) {
+ binomial = new int[n + 1][n + 1];
+ for (int r = 0; r <= n; r++) {
+ for (int c = 0; c <= r; c++) {
+ if (c == 0 || c == r) {
+ binomial[r][c] = 1;
+ } else {
+ binomial[r][c] = binomial[r - 1][c - 1] + binomial[r - 1][c];
+ }
+ }
+ }
+ }
+ return binomial[n][k];
+ }
+
+ private boolean areComponentsReadableWritableState(List<ILSMDiskComponent> immutableComponents) {
+ for (ILSMComponent c : immutableComponents) {
+ if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+ return false;
+ }
+ }
+ return true;
}
@Override
@@ -63,77 +142,11 @@
@Override
public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException {
- // see PrefixMergePolicy.isMergeLagging() for the rationale behind this code.
-
- /**
- * case 1.
- * if totalImmutableCommponentCount < threshold,
- * merge operation is not lagged ==> return false.
- * case 2.
- * if a) totalImmutableCommponentCount >= threshold && b) there is an ongoing merge,
- * merge operation is lagged. ==> return true.
- * case 3. *SPECIAL CASE*
- * if a) totalImmutableCommponentCount >= threshold && b) there is *NO* ongoing merge,
- * merge operation is lagged. ==> *schedule a merge operation* and then return true.
- * This is a special case that requires to schedule a merge operation.
- * Otherwise, all flush operations will be hung.
- * This case can happen in a following situation:
- * The system may crash when
- * condition 1) the mergableImmutableCommponentCount >= threshold and
- * condition 2) merge operation is going on.
- * After the system is recovered, still condition 1) is true.
- * If there are flush operations in the same dataset partition after the recovery,
- * all these flush operations may not proceed since there is no ongoing merge and
- * there will be no new merge either in this situation.
- */
-
+ // TODO: for now, we simply block the ingestion when there is an ongoing merge
List<ILSMDiskComponent> immutableComponents = index.getDiskComponents();
- int totalImmutableComponentCount = immutableComponents.size();
-
- // [case 1]
- if (totalImmutableComponentCount < numComponents) {
- return false;
- }
-
- boolean isMergeOngoing = isMergeOngoing(immutableComponents);
-
- // here, implicitly (totalImmutableComponentCount >= numComponents) is true by passing case 1.
- if (isMergeOngoing) {
- // [case 2]
- return true;
- } else {
- // [case 3]
- // schedule a merge operation after making sure that all components are mergable
- if (!areComponentsMergable(immutableComponents)) {
- throw new IllegalStateException();
- }
- ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- accessor.scheduleMerge(immutableComponents);
- return true;
- }
+ return isMergeOngoing(immutableComponents);
}
- /**
- * checks whether all given components are mergable or not
- *
- * @param immutableComponents
- * @return true if all components are mergable, false otherwise.
- */
- private boolean areComponentsMergable(List<ILSMDiskComponent> immutableComponents) {
- for (ILSMComponent c : immutableComponents) {
- if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * This method returns whether there is an ongoing merge operation or not by checking
- * each component state of given components.
- *
- * @return true if there is an ongoing merge operation, false otherwise.
- */
private boolean isMergeOngoing(List<ILSMDiskComponent> immutableComponents) {
int size = immutableComponents.size();
for (int i = 0; i < size; i++) {