Implemented Disk Components Alignment Based on IDs
- Added IDs (using LSN) for disk components. When a disk component is
flushed, its initial ID is set as the LSN. When components are merged,
the result ID is the union of all IDs to be merged.
- Changed the correlated merge policy to correlate disk components of
the primary and secondaries using IDs.
Change-Id: I768ee9ac0a8d3c99c631086093a6b778b2e7588e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1761
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index cd1d95e..d28b991 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -31,9 +32,11 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy;
public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
@@ -57,66 +60,30 @@
// all such components for which the sum of their sizes exceeds MaxMrgCompSz. Schedule a merge of those components into a new component.
// 2. If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt. If so, schedule
// a merge all of the current candidates into a new single component.
- List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
- // Reverse the components order so that we look at components from oldest to newest.
- Collections.reverse(immutableComponents);
- for (ILSMDiskComponent c : immutableComponents) {
- if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+ if (fullMergeIsRequested) {
+ //full merge request is handled by each index separately, since it is possible that
+ //when a primary index wants to send full merge requests for all secondaries,
+ //one secondary index is being merged and the request cannot be scheduled
+ List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
+ if (!areComponentsReadableUnwritableState(immutableComponents)) {
return;
}
- }
- if (fullMergeIsRequested) {
+
ILSMIndexAccessor accessor =
index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
accessor.scheduleFullMerge(index.getIOOperationCallback());
return;
}
+
if (!index.isPrimaryIndex()) {
return;
}
- long totalSize = 0;
- int startIndex = -1;
- int minNumComponents = Integer.MAX_VALUE;
-
- Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetInfo(datasetId).getDatasetIndexes();
- for (ILSMIndex lsmIndex : indexes) {
- minNumComponents = Math.min(minNumComponents, lsmIndex.getImmutableComponents().size());
+ List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
+ if (!areComponentsReadableUnwritableState(immutableComponents)) {
+ return;
}
-
- for (int i = 0; i < minNumComponents; i++) {
- ILSMComponent c = immutableComponents.get(i);
- long componentSize = ((ILSMDiskComponent) c).getComponentSize();
- if (componentSize > maxMergableComponentSize) {
- startIndex = i;
- totalSize = 0;
- continue;
- }
- totalSize += componentSize;
- boolean isLastComponent = i + 1 == minNumComponents ? true : false;
- if (totalSize > maxMergableComponentSize
- || (isLastComponent && i - startIndex >= maxToleranceComponentCount)) {
-
- for (ILSMIndex lsmIndex : indexes) {
- List<ILSMDiskComponent> reversedImmutableComponents =
- new ArrayList<>(lsmIndex.getImmutableComponents());
- // Reverse the components order so that we look at components from oldest to newest.
- Collections.reverse(reversedImmutableComponents);
-
- List<ILSMDiskComponent> mergableComponents = new ArrayList<>();
- for (int j = startIndex + 1; j <= i; j++) {
- mergableComponents.add(reversedImmutableComponents.get(j));
- }
- // Reverse the components order back to its original order
- Collections.reverse(mergableComponents);
-
- ILSMIndexAccessor accessor =
- lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents);
- }
- break;
- }
- }
+ scheduleMerge(index);
}
@Override
@@ -127,9 +94,215 @@
Integer.parseInt(properties.get(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_COUNT));
}
+ /**
+ * Adopts the similar logic to decide merge lagging based on {@link PrefixMergePolicy}
+ *
+ * @throws HyracksDataException
+ */
@Override
- public boolean isMergeLagging(ILSMIndex index) {
- //TODO implement properly according to the merge policy
+ public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException {
+ /**
+ * case 1.
+ * if mergableImmutableCommponentCount < threshold,
+ * merge operation is not lagged ==> return false.
+ * case 2.
+ * if a) mergableImmutableCommponentCount >= threshold && b) there is an ongoing merge,
+ * merge operation is lagged. ==> return true.
+ * case 3. *SPECIAL CASE*
+ * if a) mergableImmutableCommponentCount >= threshold && b) there is *NO* ongoing merge,
+ * merge operation is lagged. ==> *schedule a merge operation* and then return true.
+ * This is a special case that requires to schedule a merge operation.
+ * Otherwise, all flush operations will be hung.
+ * This case can happen in a following situation:
+ * The system may crash when
+ * condition 1) the mergableImmutableCommponentCount >= threshold and
+ * condition 2) merge operation is going on.
+ * After the system is recovered, still condition 1) is true.
+ * If there are flush operations in the same dataset partition after the recovery,
+ * all these flush operations may not proceed since there is no ongoing merge and
+ * there will be no new merge either in this situation.
+ * Note for case 3, we only let the primary index to schedule merge operations on behalf
+ * of all indexes.
+ */
+
+ List<ILSMDiskComponent> immutableComponents = index.getImmutableComponents();
+ int mergableImmutableComponentCount = getMergableImmutableComponentCount(immutableComponents);
+
+ // [case 1]
+ if (mergableImmutableComponentCount < maxToleranceComponentCount) {
+ return false;
+ }
+
+ boolean isMergeOngoing = isMergeOngoing(immutableComponents);
+
+ if (isMergeOngoing) {
+ // [case 2]
+ return true;
+ }
+
+ if (index.isPrimaryIndex()) {
+ // [case 3]
+ // make sure that all components are of READABLE_UNWRITABLE state.
+ if (!areComponentsReadableUnwritableState(immutableComponents)) {
+ throw new IllegalStateException();
+ }
+ // schedule a merge operation
+ boolean isMergeTriggered = scheduleMerge(index);
+ if (!isMergeTriggered) {
+ throw new IllegalStateException();
+ }
+ return true;
+ } else {
+ //[case 3]
+ //if the index is secondary then ignore the merge request (since the merge should be
+ //triggered by the primary) and here we simply treat it as not lagged.
+ return false;
+ }
+ }
+
+ private boolean scheduleMerge(ILSMIndex index) throws HyracksDataException {
+ List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
+ Collections.reverse(immutableComponents);
+
+ long totalSize = 0;
+ int startIndex = -1;
+
+ int numComponents = immutableComponents.size();
+
+ for (int i = 0; i < numComponents; i++) {
+ ILSMComponent c = immutableComponents.get(i);
+ long componentSize = ((ILSMDiskComponent) c).getComponentSize();
+ if (componentSize > maxMergableComponentSize || ((ILSMDiskComponent) c).getComponentId().notFound()) {
+ startIndex = i;
+ totalSize = 0;
+ continue;
+ }
+ totalSize += componentSize;
+ boolean isLastComponent = i + 1 == numComponents ? true : false;
+ if (totalSize > maxMergableComponentSize
+ || (isLastComponent && i - startIndex >= maxToleranceComponentCount)) {
+ //merge disk components from startIndex+1 to i
+ long minID = Long.MAX_VALUE;
+ long maxID = Long.MIN_VALUE;
+ for (int j = startIndex + 1; j <= i; j++) {
+ ILSMDiskComponentId id = immutableComponents.get(j).getComponentId();
+ if (minID > id.getMinId()) {
+ minID = id.getMinId();
+ }
+ if (maxID < id.getMaxId()) {
+ maxID = id.getMaxId();
+ }
+ }
+ Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos();
+ int partition = getIndexPartition(index, indexInfos);
+ triggerScheduledMerge(minID, maxID, indexInfos.stream().filter(info -> info.getPartition() == partition)
+ .collect(Collectors.toSet()));
+ return true;
+ }
+ }
return false;
}
+
+ /**
+ * Submit merge requests for all disk components within [minID, maxID]
+ * of all indexes of a given dataset in the given partition
+ *
+ * @param minID
+ * @param maxID
+ * @param partition
+ * @param indexInfos
+ * @throws HyracksDataException
+ */
+ private void triggerScheduledMerge(long minID, long maxID, Set<IndexInfo> indexInfos) throws HyracksDataException {
+ for (IndexInfo info : indexInfos) {
+ ILSMIndex lsmIndex = info.getIndex();
+
+ List<ILSMDiskComponent> immutableComponents = new ArrayList<>(lsmIndex.getImmutableComponents());
+ if (isMergeOngoing(immutableComponents)) {
+ continue;
+ }
+ List<ILSMDiskComponent> mergableComponents = new ArrayList<>();
+ for (ILSMDiskComponent component : immutableComponents) {
+ ILSMDiskComponentId id = component.getComponentId();
+ if (!id.notFound()) {
+ if (id.getMinId() >= minID && id.getMaxId() <= maxID) {
+ mergableComponents.add(component);
+ }
+ if (id.getMaxId() < minID) {
+ //disk components are ordered from latest (with largest IDs) to oldest (with smallest IDs)
+ //if the component.maxID < minID, we can safely skip the rest disk components in the list
+ break;
+ }
+ }
+ }
+ ILSMIndexAccessor accessor =
+ lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents);
+ }
+ }
+
+ /**
+ * This method returns the number of mergable components among the given list
+ * of immutable components that are ordered from the latest component to order ones. A caller
+ * need to make sure the order in the list.
+ *
+ * @param immutableComponents
+ * @return the number of mergable component
+ * @throws HyracksDataException
+ */
+ private int getMergableImmutableComponentCount(List<ILSMDiskComponent> immutableComponents)
+ throws HyracksDataException {
+ int count = 0;
+ for (ILSMComponent c : immutableComponents) {
+ long componentSize = ((ILSMDiskComponent) c).getComponentSize();
+ //stop when the first non-mergable component is found.
+ if (c.getState() != ComponentState.READABLE_UNWRITABLE || componentSize > maxMergableComponentSize
+ || ((ILSMDiskComponent) c).getComponentId().notFound()) {
+ break;
+ }
+ ++count;
+ }
+ return count;
+ }
+
+ /**
+ * This method returns whether there is an ongoing merge operation or not by checking
+ * each component state of given components.
+ *
+ * @param immutableComponents
+ * @return true if there is an ongoing merge operation, false otherwise.
+ */
+ private boolean isMergeOngoing(List<ILSMDiskComponent> immutableComponents) {
+ int size = immutableComponents.size();
+ for (int i = 0; i < size; i++) {
+ if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * checks whether all given components are of READABLE_UNWRITABLE state
+ *
+ * @param immutableComponents
+ * @return true if all components are of READABLE_UNWRITABLE state, false otherwise.
+ */
+ private boolean areComponentsReadableUnwritableState(List<ILSMDiskComponent> immutableComponents) {
+ for (ILSMComponent c : immutableComponents) {
+ if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) {
+ for (IndexInfo info : indexInfos) {
+ if (info.getIndex() == index) {
+ return info.getPartition();
+ }
+ }
+ return -1;
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 6a2cc56..71d4a96 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -76,6 +76,17 @@
return datasetIndexes;
}
+ public synchronized Set<IndexInfo> getDatsetIndexInfos() {
+ Set<IndexInfo> infos = new HashSet<>();
+ for (IndexInfo iInfo : getIndexes().values()) {
+ if (iInfo.isOpen()) {
+ infos.add(iInfo);
+ }
+ }
+
+ return infos;
+ }
+
@Override
public int compareTo(DatasetInfo i) {
// sort by (isOpen, referenceCount, lastAccess) ascending, where true < false
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 9529366..51a535a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -93,12 +93,12 @@
public synchronized void register(String resourcePath, IIndex index) throws HyracksDataException {
validateDatasetLifecycleManagerState();
int did = getDIDfromResourcePath(resourcePath);
- long resourceID = getResourceIDfromResourcePath(resourcePath);
+ LocalResource resource = resourceRepository.get(resourcePath);
DatasetResource datasetResource = datasets.get(did);
if (datasetResource == null) {
datasetResource = getDatasetLifecycle(did);
}
- datasetResource.register(resourceID, index);
+ datasetResource.register(resource, index);
}
public int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index a880ce2..f2f3b93 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -20,9 +20,11 @@
import java.util.Map;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.LocalResource;
/**
* A dataset can be in one of two states { EVICTED , LOADED }.
@@ -41,8 +43,7 @@
private final PrimaryIndexOperationTracker datasetPrimaryOpTracker;
private final DatasetVirtualBufferCaches datasetVirtualBufferCaches;
- public DatasetResource(DatasetInfo datasetInfo,
- PrimaryIndexOperationTracker datasetPrimaryOpTracker,
+ public DatasetResource(DatasetInfo datasetInfo, PrimaryIndexOperationTracker datasetPrimaryOpTracker,
DatasetVirtualBufferCaches datasetVirtualBufferCaches) {
this.datasetInfo = datasetInfo;
this.datasetPrimaryOpTracker = datasetPrimaryOpTracker;
@@ -86,7 +87,8 @@
return (iInfo == null) ? null : iInfo.getIndex();
}
- public void register(long resourceID, IIndex index) throws HyracksDataException {
+ public void register(LocalResource resource, IIndex index) throws HyracksDataException {
+ long resourceID = resource.getId();
if (!datasetInfo.isRegistered()) {
synchronized (datasetInfo) {
if (!datasetInfo.isRegistered()) {
@@ -102,8 +104,8 @@
if (index == null) {
throw new HyracksDataException("Attempt to register a null index");
}
- datasetInfo.getIndexes().put(resourceID,
- new IndexInfo((ILSMIndex) index, datasetInfo.getDatasetID(), resourceID));
+ datasetInfo.getIndexes().put(resourceID, new IndexInfo((ILSMIndex) index, datasetInfo.getDatasetID(),
+ resourceID, ((DatasetLocalResource) resource.getResource()).getPartition()));
}
public DatasetInfo getDatasetInfo() {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
index 34ccce0..9eb5b6c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
@@ -22,13 +22,15 @@
public class IndexInfo extends Info {
private final ILSMIndex index;
- private final long resourceId;
private final int datasetId;
+ private final long resourceId;
+ private final int partition;
- public IndexInfo(ILSMIndex index, int datasetId, long resourceId) {
+ public IndexInfo(ILSMIndex index, int datasetId, long resourceId, int partition) {
this.index = index;
this.datasetId = datasetId;
this.resourceId = resourceId;
+ this.partition = partition;
}
public ILSMIndex getIndex() {
@@ -39,6 +41,10 @@
return resourceId;
}
+ public int getPartition() {
+ return partition;
+ }
+
public int getDatasetId() {
return datasetId;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index ec7df85..f357aea 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -20,6 +20,8 @@
package org.apache.asterix.common.ioopcallbacks;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.primitive.LongPointable;
@@ -27,13 +29,17 @@
import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentId;
import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil;
// A single LSMIOOperationCallback per LSM index used to perform actions around Flush and Merge operations
public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback {
+ private static final Logger logger = Logger.getLogger(AbstractLSMIOOperationCallback.class.getName());
+
public static final MutableArrayValueReference LSN_KEY = new MutableArrayValueReference("LSN".getBytes());
public static final long INVALID = -1L;
@@ -106,6 +112,42 @@
return pointable.getLength() == 0 ? INVALID : pointable.longValue();
}
+ private ILSMDiskComponentId getComponentId(List<ILSMComponent> oldComponents) throws HyracksDataException {
+ if (oldComponents == null) {
+ //if oldComponents == null, then getComponentLSN would treat it as a flush operation,
+ //and return the LSN for the flushed component
+ long id = getComponentLSN(null);
+ if (id == 0) {
+ logger.log(Level.WARNING, "Flushing a memory component without setting the LSN");
+ id = ILSMDiskComponentId.NOT_FOUND;
+ }
+ return new LSMDiskComponentId(id, id);
+ } else {
+ long minId = Long.MAX_VALUE;
+ long maxId = Long.MIN_VALUE;
+ for (ILSMComponent oldComponent : oldComponents) {
+ ILSMDiskComponentId oldComponentId = ((ILSMDiskComponent) oldComponent).getComponentId();
+ if (oldComponentId.getMinId() < minId) {
+ minId = oldComponentId.getMinId();
+ }
+ if (oldComponentId.getMaxId() > maxId) {
+ maxId = oldComponentId.getMaxId();
+ }
+ }
+ return new LSMDiskComponentId(minId, maxId);
+ }
+ }
+
+ private void putComponentIdIntoMetadata(ILSMDiskComponent component, List<ILSMComponent> oldComponents)
+ throws HyracksDataException {
+ DiskComponentMetadata metadata = component.getMetadata();
+ ILSMDiskComponentId componentId = getComponentId(oldComponents);
+ metadata.put(ILSMDiskComponentId.COMPONENT_ID_MIN_KEY,
+ LongPointable.FACTORY.createPointable(componentId.getMinId()));
+ metadata.put(ILSMDiskComponentId.COMPONENT_ID_MAX_KEY,
+ LongPointable.FACTORY.createPointable(componentId.getMaxId()));
+ }
+
public synchronized void updateLastLSN(long lastLSN) {
if (!flushRequested[writeIndex]) {
//if the memory component pointed by writeIndex is being flushed, we should ignore this update call
@@ -144,6 +186,7 @@
//TODO: Copying Filters and all content of the metadata pages for flush operation should be done here
if (newComponent != null) {
putLSNIntoMetadata(newComponent, oldComponents);
+ putComponentIdIntoMetadata(newComponent, oldComponents);
if (opType == LSMOperationType.MERGE) {
LongPointable markerLsn = LongPointable.FACTORY
.createPointable(ComponentMetadataUtil.getLong(oldComponents.get(0).getMetadata(),
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
new file mode 100644
index 0000000..e36f4a8
--- /dev/null
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.context;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.context.CorrelatedPrefixMergePolicy;
+import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
+import org.apache.asterix.common.context.DatasetInfo;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentId;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.storage.common.ISearchOperationCallback;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import junit.framework.TestCase;
+
+public class CorrelatedPrefixMergePolicyTest extends TestCase {
+
+ private final long DEFAULT_COMPONENT_SIZE = 1L;
+
+ private final int MAX_COMPONENT_SIZE = 3;
+
+ private final int MAX_COMPONENT_COUNT = 3;
+
+ private final int DATASET_ID = 1;
+
+ @Test
+ public void testBasic() {
+ try {
+ List<ILSMDiskComponentId> componentIDs =
+ Arrays.asList(new LSMDiskComponentId(5, 5), new LSMDiskComponentId(4, 4),
+ new LSMDiskComponentId(3, 3), new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1));
+
+ List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+ IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0);
+
+ List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+ IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0);
+
+ ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+ policy.diskComponentAdded(secondary.getIndex(), false);
+ Assert.assertTrue(resultPrimaryIDs.isEmpty());
+ Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+ policy.diskComponentAdded(primary.getIndex(), false);
+
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(4, 4), new LSMDiskComponentId(3, 3),
+ new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1)), resultPrimaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(4, 4), new LSMDiskComponentId(3, 3),
+ new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1)), resultSecondaryIDs);
+
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testIDIntervals() {
+ try {
+ List<ILSMDiskComponentId> componentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
+ new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24),
+ new LSMDiskComponentId(10, 19));
+
+ List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+ IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0);
+
+ List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+ IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0);
+
+ ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+ policy.diskComponentAdded(secondary.getIndex(), false);
+ Assert.assertTrue(resultPrimaryIDs.isEmpty());
+ Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+ policy.diskComponentAdded(primary.getIndex(), false);
+
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
+ new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
+ new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultSecondaryIDs);
+
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSecondaryMissing() {
+ try {
+ List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
+ new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24),
+ new LSMDiskComponentId(10, 19));
+ List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+ IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0);
+
+ List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35),
+ new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24));
+ List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+ IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0);
+
+ ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+
+ policy.diskComponentAdded(secondary.getIndex(), false);
+ Assert.assertTrue(resultPrimaryIDs.isEmpty());
+ Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+ policy.diskComponentAdded(primary.getIndex(), false);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
+ new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
+ new LSMDiskComponentId(20, 24)), resultSecondaryIDs);
+
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPrimaryNotFound() {
+ try {
+ List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
+ new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
+ new LSMDiskComponentId(ILSMDiskComponentId.NOT_FOUND, ILSMDiskComponentId.NOT_FOUND),
+ new LSMDiskComponentId(10, 19));
+ List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+ IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0);
+
+ List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35),
+ new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24));
+ List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+ IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0);
+
+ ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+
+ policy.diskComponentAdded(secondary.getIndex(), false);
+ Assert.assertTrue(resultPrimaryIDs.isEmpty());
+ Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+ policy.diskComponentAdded(primary.getIndex(), false);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(40, 50), new LSMDiskComponentId(30, 35),
+ new LSMDiskComponentId(25, 29)), resultPrimaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29)),
+ resultSecondaryIDs);
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSecondaryNotFound() {
+ try {
+ List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
+ new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24),
+ new LSMDiskComponentId(10, 19));
+ List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+ IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0);
+
+ List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35),
+ new LSMDiskComponentId(ILSMDiskComponentId.NOT_FOUND, ILSMDiskComponentId.NOT_FOUND),
+ new LSMDiskComponentId(20, 24));
+ List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+ IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0);
+
+ ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
+
+ policy.diskComponentAdded(secondary.getIndex(), false);
+ Assert.assertTrue(resultPrimaryIDs.isEmpty());
+ Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+ policy.diskComponentAdded(primary.getIndex(), false);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
+ new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(20, 24)),
+ resultSecondaryIDs);
+
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMultiPartition() {
+ try {
+ List<ILSMDiskComponentId> componentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
+ new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24),
+ new LSMDiskComponentId(10, 19));
+
+ List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+ IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0);
+
+ List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+ IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0);
+
+ List<ILSMDiskComponentId> resultSecondaryIDs1 = new ArrayList<>();
+ IndexInfo secondary1 = mockIndex(false, componentIDs, resultSecondaryIDs, 1);
+
+ ILSMMergePolicy policy = mockMergePolicy(primary, secondary, secondary1);
+ policy.diskComponentAdded(secondary.getIndex(), false);
+ Assert.assertTrue(resultPrimaryIDs.isEmpty());
+ Assert.assertTrue(resultSecondaryIDs.isEmpty());
+
+ policy.diskComponentAdded(primary.getIndex(), false);
+
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
+ new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
+ new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultSecondaryIDs);
+ Assert.assertTrue(resultSecondaryIDs1.isEmpty());
+ } catch (HyracksDataException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ private ILSMMergePolicy mockMergePolicy(IndexInfo... indexes) {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_COUNT, String.valueOf(MAX_COMPONENT_COUNT));
+ properties.put(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_SIZE, String.valueOf(MAX_COMPONENT_SIZE));
+
+ Set<IndexInfo> indexInfos = new HashSet<>();
+ for (IndexInfo info : indexes) {
+ indexInfos.add(info);
+ }
+
+ DatasetInfo dsInfo = Mockito.mock(DatasetInfo.class);
+ Mockito.when(dsInfo.getDatsetIndexInfos()).thenReturn(indexInfos);
+
+ IDatasetLifecycleManager manager = Mockito.mock(IDatasetLifecycleManager.class);
+ Mockito.when(manager.getDatasetInfo(DATASET_ID)).thenReturn(dsInfo);
+
+ ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(manager, DATASET_ID);
+ policy.configure(properties);
+ return policy;
+ }
+
+ private IndexInfo mockIndex(boolean isPrimary, List<ILSMDiskComponentId> componentIDs,
+ List<ILSMDiskComponentId> resultComponentIDs, int partition) throws HyracksDataException {
+ List<ILSMDiskComponent> components = new ArrayList<>();
+ for (ILSMDiskComponentId id : componentIDs) {
+ ILSMDiskComponent component = Mockito.mock(ILSMDiskComponent.class);
+ Mockito.when(component.getComponentId()).thenReturn(id);
+ Mockito.when(component.getComponentSize()).thenReturn(DEFAULT_COMPONENT_SIZE);
+ Mockito.when(component.getState()).thenReturn(ComponentState.READABLE_UNWRITABLE);
+ components.add(component);
+ }
+
+ ILSMIndex index = Mockito.mock(ILSMIndex.class);
+ Mockito.when(index.getImmutableComponents()).thenReturn(components);
+
+ ILSMIndexAccessor accessor = Mockito.mock(ILSMIndexAccessor.class);
+ Mockito.doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ List<ILSMDiskComponent> mergedComponents = invocation.getArgumentAt(1, List.class);
+ mergedComponents.forEach(component -> {
+ try {
+ resultComponentIDs.add(component.getComponentId());
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ }
+ });
+ return null;
+ }
+ }).when(accessor).scheduleMerge(Mockito.any(ILSMIOOperationCallback.class),
+ Mockito.anyListOf(ILSMDiskComponent.class));
+
+ Mockito.when(index.createAccessor(Mockito.any(IModificationOperationCallback.class),
+ Mockito.any(ISearchOperationCallback.class))).thenReturn(accessor);
+ Mockito.when(index.isPrimaryIndex()).thenReturn(isPrimary);
+
+ return new IndexInfo(index, DATASET_ID, 0, partition);
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
index 335e84e..aed641d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
@@ -46,4 +46,12 @@
* @throws HyracksDataException
*/
void destroy() throws HyracksDataException;
+
+ /**
+ * Return the component Id of this disk component from its metadata
+ * @return
+ * @throws HyracksDataException
+ */
+ ILSMDiskComponentId getComponentId() throws HyracksDataException;
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java
new file mode 100644
index 0000000..5d38ace
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+
+/**
+ * Stores the id of the disk component, which is a interval (minId, maxId).
+ * When a disk component is formed by the flush operation, its initial minId and maxId are the same, and
+ * currently are set as the flush LSN.
+ * When a disk component is formed by the merge operation, its [minId, maxId] is set as the union of
+ * all ids of merged disk components.
+ *
+ * @author luochen
+ *
+ */
+public interface ILSMDiskComponentId {
+
+ public static final long NOT_FOUND = -1;
+
+ public static final MutableArrayValueReference COMPONENT_ID_MIN_KEY =
+ new MutableArrayValueReference("Component_Id_Min".getBytes());
+
+ public static final MutableArrayValueReference COMPONENT_ID_MAX_KEY =
+ new MutableArrayValueReference("Component_Id_Max".getBytes());
+
+ long getMinId();
+
+ long getMaxId();
+
+ default boolean notFound() {
+ return getMinId() == NOT_FOUND || getMaxId() == NOT_FOUND;
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 508a6cc..9e5a230 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -22,7 +22,9 @@
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.utils.ComponentMetadataUtil;
public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent implements ILSMDiskComponent {
@@ -96,4 +98,14 @@
public DiskComponentMetadata getMetadata() {
return metadata;
}
+
+ @Override
+ public ILSMDiskComponentId getComponentId() throws HyracksDataException {
+ long minID = ComponentMetadataUtil.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MIN_KEY,
+ ILSMDiskComponentId.NOT_FOUND);
+ long maxID = ComponentMetadataUtil.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MAX_KEY,
+ ILSMDiskComponentId.NOT_FOUND);
+ //TODO: do we need to throw an exception when ID is not found?
+ return new LSMDiskComponentId(minID, maxID);
+ }
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java
new file mode 100644
index 0000000..f448c84
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
+
+public class LSMDiskComponentId implements ILSMDiskComponentId {
+
+ private final long minId;
+
+ private final long maxId;
+
+ public LSMDiskComponentId(long minId, long maxId) {
+ this.minId = minId;
+ this.maxId = maxId;
+ }
+
+ @Override
+ public long getMinId() {
+ return this.minId;
+ }
+
+ @Override
+ public long getMaxId() {
+ return this.maxId;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + minId + "," + maxId + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * Long.hashCode(minId) + Long.hashCode(maxId);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof LSMDiskComponentId)) {
+ return false;
+ }
+ LSMDiskComponentId other = (LSMDiskComponentId) obj;
+ if (maxId != other.maxId) {
+ return false;
+ }
+ if (minId != other.minId) {
+ return false;
+ }
+ return true;
+ }
+
+}