Implemented Disk Components Alignment Based on IDs

- Added IDs (using LSN) for disk components. When a disk component is
flushed, its initial ID is set as the LSN. When components are merged,
the result ID is the union of all IDs to be merged.
- Changed the correlated merge policy to correlate disk components of
the primary and secondaries using IDs.

Change-Id: I768ee9ac0a8d3c99c631086093a6b778b2e7588e
Reviewed-by: abdullah alamoudi <>
Tested-by: Jenkins <>
BAD: Jenkins <>
Integration-Tests: Jenkins <>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/
index cd1d95e..d28b991 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -31,9 +32,11 @@
 public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
@@ -57,66 +60,30 @@
         // all such components for which the sum of their sizes exceeds MaxMrgCompSz.  Schedule a merge of those components into a new component.
         // 2.  If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt.  If so, schedule
         // a merge all of the current candidates into a new single component.
-        List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
-        // Reverse the components order so that we look at components from oldest to newest.
-        Collections.reverse(immutableComponents);
-        for (ILSMDiskComponent c : immutableComponents) {
-            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+        if (fullMergeIsRequested) {
+            //full merge request is handled by each index separately, since it is possible that
+            //when a primary index wants to send full merge requests for all secondaries,
+            //one secondary index is being merged and the request cannot be scheduled
+            List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
+            if (!areComponentsReadableUnwritableState(immutableComponents)) {
-        }
-        if (fullMergeIsRequested) {
             ILSMIndexAccessor accessor =
                     index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
         if (!index.isPrimaryIndex()) {
-        long totalSize = 0;
-        int startIndex = -1;
-        int minNumComponents = Integer.MAX_VALUE;
-        Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetInfo(datasetId).getDatasetIndexes();
-        for (ILSMIndex lsmIndex : indexes) {
-            minNumComponents = Math.min(minNumComponents, lsmIndex.getImmutableComponents().size());
+        List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
+        if (!areComponentsReadableUnwritableState(immutableComponents)) {
+            return;
-        for (int i = 0; i < minNumComponents; i++) {
-            ILSMComponent c = immutableComponents.get(i);
-            long componentSize = ((ILSMDiskComponent) c).getComponentSize();
-            if (componentSize > maxMergableComponentSize) {
-                startIndex = i;
-                totalSize = 0;
-                continue;
-            }
-            totalSize += componentSize;
-            boolean isLastComponent = i + 1 == minNumComponents ? true : false;
-            if (totalSize > maxMergableComponentSize
-                    || (isLastComponent && i - startIndex >= maxToleranceComponentCount)) {
-                for (ILSMIndex lsmIndex : indexes) {
-                    List<ILSMDiskComponent> reversedImmutableComponents =
-                            new ArrayList<>(lsmIndex.getImmutableComponents());
-                    // Reverse the components order so that we look at components from oldest to newest.
-                    Collections.reverse(reversedImmutableComponents);
-                    List<ILSMDiskComponent> mergableComponents = new ArrayList<>();
-                    for (int j = startIndex + 1; j <= i; j++) {
-                        mergableComponents.add(reversedImmutableComponents.get(j));
-                    }
-                    // Reverse the components order back to its original order
-                    Collections.reverse(mergableComponents);
-                    ILSMIndexAccessor accessor =
-                            lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-                    accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents);
-                }
-                break;
-            }
-        }
+        scheduleMerge(index);
@@ -127,9 +94,215 @@
+    /**
+     * Adopts the similar logic to decide merge lagging based on {@link PrefixMergePolicy}
+     *
+     * @throws HyracksDataException
+     */
-    public boolean isMergeLagging(ILSMIndex index) {
-        //TODO implement properly according to the merge policy
+    public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException {
+        /**
+         * case 1.
+         * if mergableImmutableCommponentCount < threshold,
+         * merge operation is not lagged ==> return false.
+         * case 2.
+         * if a) mergableImmutableCommponentCount >= threshold && b) there is an ongoing merge,
+         * merge operation is lagged. ==> return true.
+         * case 3. *SPECIAL CASE*
+         * if a) mergableImmutableCommponentCount >= threshold && b) there is *NO* ongoing merge,
+         * merge operation is lagged. ==> *schedule a merge operation* and then return true.
+         * This is a special case that requires to schedule a merge operation.
+         * Otherwise, all flush operations will be hung.
+         * This case can happen in a following situation:
+         * The system may crash when
+         * condition 1) the mergableImmutableCommponentCount >= threshold and
+         * condition 2) merge operation is going on.
+         * After the system is recovered, still condition 1) is true.
+         * If there are flush operations in the same dataset partition after the recovery,
+         * all these flush operations may not proceed since there is no ongoing merge and
+         * there will be no new merge either in this situation.
+         * Note for case 3, we only let the primary index to schedule merge operations on behalf
+         * of all indexes.
+         */
+        List<ILSMDiskComponent> immutableComponents = index.getImmutableComponents();
+        int mergableImmutableComponentCount = getMergableImmutableComponentCount(immutableComponents);
+        // [case 1]
+        if (mergableImmutableComponentCount < maxToleranceComponentCount) {
+            return false;
+        }
+        boolean isMergeOngoing = isMergeOngoing(immutableComponents);
+        if (isMergeOngoing) {
+            // [case 2]
+            return true;
+        }
+        if (index.isPrimaryIndex()) {
+            // [case 3]
+            // make sure that all components are of READABLE_UNWRITABLE state.
+            if (!areComponentsReadableUnwritableState(immutableComponents)) {
+                throw new IllegalStateException();
+            }
+            // schedule a merge operation
+            boolean isMergeTriggered = scheduleMerge(index);
+            if (!isMergeTriggered) {
+                throw new IllegalStateException();
+            }
+            return true;
+        } else {
+            //[case 3]
+            //if the index is secondary then ignore the merge request (since the merge should be
+            //triggered by the primary) and here we simply treat it as not lagged.
+            return false;
+        }
+    }
+    private boolean scheduleMerge(ILSMIndex index) throws HyracksDataException {
+        List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
+        Collections.reverse(immutableComponents);
+        long totalSize = 0;
+        int startIndex = -1;
+        int numComponents = immutableComponents.size();
+        for (int i = 0; i < numComponents; i++) {
+            ILSMComponent c = immutableComponents.get(i);
+            long componentSize = ((ILSMDiskComponent) c).getComponentSize();
+            if (componentSize > maxMergableComponentSize || ((ILSMDiskComponent) c).getComponentId().notFound()) {
+                startIndex = i;
+                totalSize = 0;
+                continue;
+            }
+            totalSize += componentSize;
+            boolean isLastComponent = i + 1 == numComponents ? true : false;
+            if (totalSize > maxMergableComponentSize
+                    || (isLastComponent && i - startIndex >= maxToleranceComponentCount)) {
+                //merge disk components from startIndex+1 to i
+                long minID = Long.MAX_VALUE;
+                long maxID = Long.MIN_VALUE;
+                for (int j = startIndex + 1; j <= i; j++) {
+                    ILSMDiskComponentId id = immutableComponents.get(j).getComponentId();
+                    if (minID > id.getMinId()) {
+                        minID = id.getMinId();
+                    }
+                    if (maxID < id.getMaxId()) {
+                        maxID = id.getMaxId();
+                    }
+                }
+                Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos();
+                int partition = getIndexPartition(index, indexInfos);
+                triggerScheduledMerge(minID, maxID, -> info.getPartition() == partition)
+                        .collect(Collectors.toSet()));
+                return true;
+            }
+        }
         return false;
+    /**
+     * Submit merge requests for all disk components within [minID, maxID]
+     * of all indexes of a given dataset in the given partition
+     *
+     * @param minID
+     * @param maxID
+     * @param partition
+     * @param indexInfos
+     * @throws HyracksDataException
+     */
+    private void triggerScheduledMerge(long minID, long maxID, Set<IndexInfo> indexInfos) throws HyracksDataException {
+        for (IndexInfo info : indexInfos) {
+            ILSMIndex lsmIndex = info.getIndex();
+            List<ILSMDiskComponent> immutableComponents = new ArrayList<>(lsmIndex.getImmutableComponents());
+            if (isMergeOngoing(immutableComponents)) {
+                continue;
+            }
+            List<ILSMDiskComponent> mergableComponents = new ArrayList<>();
+            for (ILSMDiskComponent component : immutableComponents) {
+                ILSMDiskComponentId id = component.getComponentId();
+                if (!id.notFound()) {
+                    if (id.getMinId() >= minID && id.getMaxId() <= maxID) {
+                        mergableComponents.add(component);
+                    }
+                    if (id.getMaxId() < minID) {
+                        //disk components are ordered from latest (with largest IDs) to oldest (with smallest IDs)
+                        //if the component.maxID < minID, we can safely skip the rest disk components in the list
+                        break;
+                    }
+                }
+            }
+            ILSMIndexAccessor accessor =
+                    lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents);
+        }
+    }
+    /**
+     * This method returns the number of mergable components among the given list
+     * of immutable components that are ordered from the latest component to order ones. A caller
+     * need to make sure the order in the list.
+     *
+     * @param immutableComponents
+     * @return the number of mergable component
+     * @throws HyracksDataException
+     */
+    private int getMergableImmutableComponentCount(List<ILSMDiskComponent> immutableComponents)
+            throws HyracksDataException {
+        int count = 0;
+        for (ILSMComponent c : immutableComponents) {
+            long componentSize = ((ILSMDiskComponent) c).getComponentSize();
+            //stop when the first non-mergable component is found.
+            if (c.getState() != ComponentState.READABLE_UNWRITABLE || componentSize > maxMergableComponentSize
+                    || ((ILSMDiskComponent) c).getComponentId().notFound()) {
+                break;
+            }
+            ++count;
+        }
+        return count;
+    }
+    /**
+     * This method returns whether there is an ongoing merge operation or not by checking
+     * each component state of given components.
+     *
+     * @param immutableComponents
+     * @return true if there is an ongoing merge operation, false otherwise.
+     */
+    private boolean isMergeOngoing(List<ILSMDiskComponent> immutableComponents) {
+        int size = immutableComponents.size();
+        for (int i = 0; i < size; i++) {
+            if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING) {
+                return true;
+            }
+        }
+        return false;
+    }
+    /**
+     * checks whether all given components are of READABLE_UNWRITABLE state
+     *
+     * @param immutableComponents
+     * @return true if all components are of READABLE_UNWRITABLE state, false otherwise.
+     */
+    private boolean areComponentsReadableUnwritableState(List<ILSMDiskComponent> immutableComponents) {
+        for (ILSMComponent c : immutableComponents) {
+            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+                return false;
+            }
+        }
+        return true;
+    }
+    private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) {
+        for (IndexInfo info : indexInfos) {
+            if (info.getIndex() == index) {
+                return info.getPartition();
+            }
+        }
+        return -1;
+    }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/
index 6a2cc56..71d4a96 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/
@@ -76,6 +76,17 @@
         return datasetIndexes;
+    public synchronized Set<IndexInfo> getDatsetIndexInfos() {
+        Set<IndexInfo> infos = new HashSet<>();
+        for (IndexInfo iInfo : getIndexes().values()) {
+            if (iInfo.isOpen()) {
+                infos.add(iInfo);
+            }
+        }
+        return infos;
+    }
     public int compareTo(DatasetInfo i) {
         // sort by (isOpen, referenceCount, lastAccess) ascending, where true < false
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/
index 9529366..51a535a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/
@@ -93,12 +93,12 @@
     public synchronized void register(String resourcePath, IIndex index) throws HyracksDataException {
         int did = getDIDfromResourcePath(resourcePath);
-        long resourceID = getResourceIDfromResourcePath(resourcePath);
+        LocalResource resource = resourceRepository.get(resourcePath);
         DatasetResource datasetResource = datasets.get(did);
         if (datasetResource == null) {
             datasetResource = getDatasetLifecycle(did);
-        datasetResource.register(resourceID, index);
+        datasetResource.register(resource, index);
     public int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/
index a880ce2..f2f3b93 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/
@@ -20,9 +20,11 @@
 import java.util.Map;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
  * A dataset can be in one of two states { EVICTED , LOADED }.
@@ -41,8 +43,7 @@
     private final PrimaryIndexOperationTracker datasetPrimaryOpTracker;
     private final DatasetVirtualBufferCaches datasetVirtualBufferCaches;
-    public DatasetResource(DatasetInfo datasetInfo,
-            PrimaryIndexOperationTracker datasetPrimaryOpTracker,
+    public DatasetResource(DatasetInfo datasetInfo, PrimaryIndexOperationTracker datasetPrimaryOpTracker,
             DatasetVirtualBufferCaches datasetVirtualBufferCaches) {
         this.datasetInfo = datasetInfo;
         this.datasetPrimaryOpTracker = datasetPrimaryOpTracker;
@@ -86,7 +87,8 @@
         return (iInfo == null) ? null : iInfo.getIndex();
-    public void register(long resourceID, IIndex index) throws HyracksDataException {
+    public void register(LocalResource resource, IIndex index) throws HyracksDataException {
+        long resourceID = resource.getId();
         if (!datasetInfo.isRegistered()) {
             synchronized (datasetInfo) {
                 if (!datasetInfo.isRegistered()) {
@@ -102,8 +104,8 @@
         if (index == null) {
             throw new HyracksDataException("Attempt to register a null index");
-        datasetInfo.getIndexes().put(resourceID,
-                new IndexInfo((ILSMIndex) index, datasetInfo.getDatasetID(), resourceID));
+        datasetInfo.getIndexes().put(resourceID, new IndexInfo((ILSMIndex) index, datasetInfo.getDatasetID(),
+                resourceID, ((DatasetLocalResource) resource.getResource()).getPartition()));
     public DatasetInfo getDatasetInfo() {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/
index 34ccce0..9eb5b6c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/
@@ -22,13 +22,15 @@
 public class IndexInfo extends Info {
     private final ILSMIndex index;
-    private final long resourceId;
     private final int datasetId;
+    private final long resourceId;
+    private final int partition;
-    public IndexInfo(ILSMIndex index, int datasetId, long resourceId) {
+    public IndexInfo(ILSMIndex index, int datasetId, long resourceId, int partition) {
         this.index = index;
         this.datasetId = datasetId;
         this.resourceId = resourceId;
+        this.partition = partition;
     public ILSMIndex getIndex() {
@@ -39,6 +41,10 @@
         return resourceId;
+    public int getPartition() {
+        return partition;
+    }
     public int getDatasetId() {
         return datasetId;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/
index ec7df85..f357aea 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/
@@ -20,6 +20,8 @@
 package org.apache.asterix.common.ioopcallbacks;
 import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -27,13 +29,17 @@
 // A single LSMIOOperationCallback per LSM index used to perform actions around Flush and Merge operations
 public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback {
+    private static final Logger logger = Logger.getLogger(AbstractLSMIOOperationCallback.class.getName());
     public static final MutableArrayValueReference LSN_KEY = new MutableArrayValueReference("LSN".getBytes());
     public static final long INVALID = -1L;
@@ -106,6 +112,42 @@
         return pointable.getLength() == 0 ? INVALID : pointable.longValue();
+    private ILSMDiskComponentId getComponentId(List<ILSMComponent> oldComponents) throws HyracksDataException {
+        if (oldComponents == null) {
+            //if oldComponents == null, then getComponentLSN would treat it as a flush operation,
+            //and return the LSN for the flushed component
+            long id = getComponentLSN(null);
+            if (id == 0) {
+                logger.log(Level.WARNING, "Flushing a memory component without setting the LSN");
+                id = ILSMDiskComponentId.NOT_FOUND;
+            }
+            return new LSMDiskComponentId(id, id);
+        } else {
+            long minId = Long.MAX_VALUE;
+            long maxId = Long.MIN_VALUE;
+            for (ILSMComponent oldComponent : oldComponents) {
+                ILSMDiskComponentId oldComponentId = ((ILSMDiskComponent) oldComponent).getComponentId();
+                if (oldComponentId.getMinId() < minId) {
+                    minId = oldComponentId.getMinId();
+                }
+                if (oldComponentId.getMaxId() > maxId) {
+                    maxId = oldComponentId.getMaxId();
+                }
+            }
+            return new LSMDiskComponentId(minId, maxId);
+        }
+    }
+    private void putComponentIdIntoMetadata(ILSMDiskComponent component, List<ILSMComponent> oldComponents)
+            throws HyracksDataException {
+        DiskComponentMetadata metadata = component.getMetadata();
+        ILSMDiskComponentId componentId = getComponentId(oldComponents);
+        metadata.put(ILSMDiskComponentId.COMPONENT_ID_MIN_KEY,
+                LongPointable.FACTORY.createPointable(componentId.getMinId()));
+        metadata.put(ILSMDiskComponentId.COMPONENT_ID_MAX_KEY,
+                LongPointable.FACTORY.createPointable(componentId.getMaxId()));
+    }
     public synchronized void updateLastLSN(long lastLSN) {
         if (!flushRequested[writeIndex]) {
             //if the memory component pointed by writeIndex is being flushed, we should ignore this update call
@@ -144,6 +186,7 @@
         //TODO: Copying Filters and all content of the metadata pages for flush operation should be done here
         if (newComponent != null) {
             putLSNIntoMetadata(newComponent, oldComponents);
+            putComponentIdIntoMetadata(newComponent, oldComponents);
             if (opType == LSMOperationType.MERGE) {
                 LongPointable markerLsn = LongPointable.FACTORY
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/
new file mode 100644
index 0000000..e36f4a8
--- /dev/null
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/
@@ -0,0 +1,312 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.context;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.context.CorrelatedPrefixMergePolicy;
+import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
+import org.apache.asterix.common.context.DatasetInfo;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import junit.framework.TestCase;
+public class CorrelatedPrefixMergePolicyTest extends TestCase {
+    private final long DEFAULT_COMPONENT_SIZE = 1L;
+    private final int MAX_COMPONENT_SIZE = 3;
+    private final int MAX_COMPONENT_COUNT = 3;
+    private final int DATASET_ID = 1;
+    @Test
+    public void testBasic() {
+        try {
+            List<ILSMDiskComponentId> componentIDs =
+                    Arrays.asList(new LSMDiskComponentId(5, 5), new LSMDiskComponentId(4, 4),
+                            new LSMDiskComponentId(3, 3), new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1));
+            List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+            IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0);
+            List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+            IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0);
+            ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+            policy.diskComponentAdded(secondary.getIndex(), false);
+            Assert.assertTrue(resultPrimaryIDs.isEmpty());
+            Assert.assertTrue(resultSecondaryIDs.isEmpty());
+            policy.diskComponentAdded(primary.getIndex(), false);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(4, 4), new LSMDiskComponentId(3, 3),
+                    new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1)), resultPrimaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(4, 4), new LSMDiskComponentId(3, 3),
+                    new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1)), resultSecondaryIDs);
+        } catch (HyracksDataException e) {
+  ;
+        }
+    }
+    @Test
+    public void testIDIntervals() {
+        try {
+            List<ILSMDiskComponentId> componentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
+                    new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24),
+                    new LSMDiskComponentId(10, 19));
+            List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+            IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0);
+            List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+            IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0);
+            ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+            policy.diskComponentAdded(secondary.getIndex(), false);
+            Assert.assertTrue(resultPrimaryIDs.isEmpty());
+            Assert.assertTrue(resultSecondaryIDs.isEmpty());
+            policy.diskComponentAdded(primary.getIndex(), false);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
+                    new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
+                    new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultSecondaryIDs);
+        } catch (HyracksDataException e) {
+  ;
+        }
+    }
+    @Test
+    public void testSecondaryMissing() {
+        try {
+            List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
+                    new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24),
+                    new LSMDiskComponentId(10, 19));
+            List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+            IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0);
+            List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35),
+                    new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24));
+            List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+            IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0);
+            ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+            policy.diskComponentAdded(secondary.getIndex(), false);
+            Assert.assertTrue(resultPrimaryIDs.isEmpty());
+            Assert.assertTrue(resultSecondaryIDs.isEmpty());
+            policy.diskComponentAdded(primary.getIndex(), false);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
+                    new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
+                    new LSMDiskComponentId(20, 24)), resultSecondaryIDs);
+        } catch (HyracksDataException e) {
+  ;
+        }
+    }
+    @Test
+    public void testPrimaryNotFound() {
+        try {
+            List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
+                    new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
+                    new LSMDiskComponentId(ILSMDiskComponentId.NOT_FOUND, ILSMDiskComponentId.NOT_FOUND),
+                    new LSMDiskComponentId(10, 19));
+            List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+            IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0);
+            List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35),
+                    new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24));
+            List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+            IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0);
+            ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+            policy.diskComponentAdded(secondary.getIndex(), false);
+            Assert.assertTrue(resultPrimaryIDs.isEmpty());
+            Assert.assertTrue(resultSecondaryIDs.isEmpty());
+            policy.diskComponentAdded(primary.getIndex(), false);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(40, 50), new LSMDiskComponentId(30, 35),
+                    new LSMDiskComponentId(25, 29)), resultPrimaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29)),
+                    resultSecondaryIDs);
+        } catch (HyracksDataException e) {
+  ;
+        }
+    }
+    @Test
+    public void testSecondaryNotFound() {
+        try {
+            List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
+                    new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24),
+                    new LSMDiskComponentId(10, 19));
+            List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+            IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0);
+            List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35),
+                    new LSMDiskComponentId(ILSMDiskComponentId.NOT_FOUND, ILSMDiskComponentId.NOT_FOUND),
+                    new LSMDiskComponentId(20, 24));
+            List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+            IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0);
+            ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+            policy.diskComponentAdded(secondary.getIndex(), false);
+            Assert.assertTrue(resultPrimaryIDs.isEmpty());
+            Assert.assertTrue(resultSecondaryIDs.isEmpty());
+            policy.diskComponentAdded(primary.getIndex(), false);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
+                    new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(20, 24)),
+                    resultSecondaryIDs);
+        } catch (HyracksDataException e) {
+  ;
+        }
+    }
+    @Test
+    public void testMultiPartition() {
+        try {
+            List<ILSMDiskComponentId> componentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
+                    new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24),
+                    new LSMDiskComponentId(10, 19));
+            List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+            IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0);
+            List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+            IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0);
+            List<ILSMDiskComponentId> resultSecondaryIDs1 = new ArrayList<>();
+            IndexInfo secondary1 = mockIndex(false, componentIDs, resultSecondaryIDs, 1);
+            ILSMMergePolicy policy = mockMergePolicy(primary, secondary, secondary1);
+            policy.diskComponentAdded(secondary.getIndex(), false);
+            Assert.assertTrue(resultPrimaryIDs.isEmpty());
+            Assert.assertTrue(resultSecondaryIDs.isEmpty());
+            policy.diskComponentAdded(primary.getIndex(), false);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
+                    new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
+                    new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultSecondaryIDs);
+            Assert.assertTrue(resultSecondaryIDs1.isEmpty());
+        } catch (HyracksDataException e) {
+  ;
+        }
+    }
+    private ILSMMergePolicy mockMergePolicy(IndexInfo... indexes) {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_COUNT, String.valueOf(MAX_COMPONENT_COUNT));
+        properties.put(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_SIZE, String.valueOf(MAX_COMPONENT_SIZE));
+        Set<IndexInfo> indexInfos = new HashSet<>();
+        for (IndexInfo info : indexes) {
+            indexInfos.add(info);
+        }
+        DatasetInfo dsInfo = Mockito.mock(DatasetInfo.class);
+        Mockito.when(dsInfo.getDatsetIndexInfos()).thenReturn(indexInfos);
+        IDatasetLifecycleManager manager = Mockito.mock(IDatasetLifecycleManager.class);
+        Mockito.when(manager.getDatasetInfo(DATASET_ID)).thenReturn(dsInfo);
+        ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(manager, DATASET_ID);
+        policy.configure(properties);
+        return policy;
+    }
+    private IndexInfo mockIndex(boolean isPrimary, List<ILSMDiskComponentId> componentIDs,
+            List<ILSMDiskComponentId> resultComponentIDs, int partition) throws HyracksDataException {
+        List<ILSMDiskComponent> components = new ArrayList<>();
+        for (ILSMDiskComponentId id : componentIDs) {
+            ILSMDiskComponent component = Mockito.mock(ILSMDiskComponent.class);
+            Mockito.when(component.getComponentId()).thenReturn(id);
+            Mockito.when(component.getComponentSize()).thenReturn(DEFAULT_COMPONENT_SIZE);
+            Mockito.when(component.getState()).thenReturn(ComponentState.READABLE_UNWRITABLE);
+            components.add(component);
+        }
+        ILSMIndex index = Mockito.mock(ILSMIndex.class);
+        Mockito.when(index.getImmutableComponents()).thenReturn(components);
+        ILSMIndexAccessor accessor = Mockito.mock(ILSMIndexAccessor.class);
+        Mockito.doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                List<ILSMDiskComponent> mergedComponents = invocation.getArgumentAt(1, List.class);
+                mergedComponents.forEach(component -> {
+                    try {
+                        resultComponentIDs.add(component.getComponentId());
+                    } catch (HyracksDataException e) {
+                        e.printStackTrace();
+                    }
+                });
+                return null;
+            }
+        }).when(accessor).scheduleMerge(Mockito.any(ILSMIOOperationCallback.class),
+                Mockito.anyListOf(ILSMDiskComponent.class));
+        Mockito.when(index.createAccessor(Mockito.any(IModificationOperationCallback.class),
+                Mockito.any(ISearchOperationCallback.class))).thenReturn(accessor);
+        Mockito.when(index.isPrimaryIndex()).thenReturn(isPrimary);
+        return new IndexInfo(index, DATASET_ID, 0, partition);
+    }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/
index 335e84e..aed641d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/
@@ -46,4 +46,12 @@
      * @throws HyracksDataException
     void destroy() throws HyracksDataException;
+    /**
+     * Return the component Id of this disk component from its metadata
+     * @return
+     * @throws HyracksDataException
+     */
+    ILSMDiskComponentId getComponentId() throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/
new file mode 100644
index 0000000..5d38ace
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/
@@ -0,0 +1,51 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ * Stores the id of the disk component, which is a interval (minId, maxId).
+ * When a disk component is formed by the flush operation, its initial minId and maxId are the same, and
+ * currently are set as the flush LSN.
+ * When a disk component is formed by the merge operation, its [minId, maxId] is set as the union of
+ * all ids of merged disk components.
+ *
+ * @author luochen
+ *
+ */
+public interface ILSMDiskComponentId {
+    public static final long NOT_FOUND = -1;
+    public static final MutableArrayValueReference COMPONENT_ID_MIN_KEY =
+            new MutableArrayValueReference("Component_Id_Min".getBytes());
+    public static final MutableArrayValueReference COMPONENT_ID_MAX_KEY =
+            new MutableArrayValueReference("Component_Id_Max".getBytes());
+    long getMinId();
+    long getMaxId();
+    default boolean notFound() {
+        return getMinId() == NOT_FOUND || getMaxId() == NOT_FOUND;
+    }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/
index 508a6cc..9e5a230 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/
@@ -22,7 +22,9 @@
 public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent implements ILSMDiskComponent {
@@ -96,4 +98,14 @@
     public DiskComponentMetadata getMetadata() {
         return metadata;
+    @Override
+    public ILSMDiskComponentId getComponentId() throws HyracksDataException {
+        long minID = ComponentMetadataUtil.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MIN_KEY,
+                ILSMDiskComponentId.NOT_FOUND);
+        long maxID = ComponentMetadataUtil.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MAX_KEY,
+                ILSMDiskComponentId.NOT_FOUND);
+        //TODO: do we need to throw an exception when ID is not found?
+        return new LSMDiskComponentId(minID, maxID);
+    }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/
new file mode 100644
index 0000000..f448c84
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/
@@ -0,0 +1,73 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+public class LSMDiskComponentId implements ILSMDiskComponentId {
+    private final long minId;
+    private final long maxId;
+    public LSMDiskComponentId(long minId, long maxId) {
+        this.minId = minId;
+        this.maxId = maxId;
+    }
+    @Override
+    public long getMinId() {
+        return this.minId;
+    }
+    @Override
+    public long getMaxId() {
+        return this.maxId;
+    }
+    @Override
+    public String toString() {
+        return "[" + minId + "," + maxId + "]";
+    }
+    @Override
+    public int hashCode() {
+        return 31 * Long.hashCode(minId) + Long.hashCode(maxId);
+    }
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof LSMDiskComponentId)) {
+            return false;
+        }
+        LSMDiskComponentId other = (LSMDiskComponentId) obj;
+        if (maxId != other.maxId) {
+            return false;
+        }
+        if (minId != other.minId) {
+            return false;
+        }
+        return true;
+    }