[ASTERIXDB-2115] Add Component Ids to LSM Indexes
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Add LSMComponentId to all LSM components. Component Ids are managed
through IO operation callbacks.
- For memory component, it's ID is reset every time it's recycled.
- For disk component, it's ID is copied from the source component(s)
during flush/merge
- For indexes of a dataset, we need to guarantee all their memory
components should recieve the same ID. This is achieved using a shared
component Id generator.
- Fix memory component recycled callback, make sure it's called only
when we've indeed recycled the memory component
A design wiki for this patch: https://cwiki.apache.org/confluence/display/
ASTERIXDB/Component+Id-based+secondary-to-primary+index+acceleration
Change-Id: I8aec6261a84a0729ce35f4b1cb708be299ddb98d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2025
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d6457f2..494eb65 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -993,8 +993,11 @@
// #. create the index artifact in NC.
runJob(hcc, spec, jobFlags);
- // #. flush the internal dataset for correlated policy
- if (ds.isCorrelated() && ds.getDatasetType() == DatasetType.INTERNAL) {
+ // #. flush the internal dataset
+ // We need this to guarantee the correctness of component Id acceleration for secondary-to-primary index.
+ // Otherwise, the new secondary index component would corresponding to a partial memory component
+ // of the primary index, which is incorrect.
+ if (ds.getDatasetType() == DatasetType.INTERNAL) {
FlushDatasetUtil.flushDataset(hcc, metadataProvider, index.getDataverseName(), index.getDatasetName());
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 6bff50d..00b185d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -197,6 +197,7 @@
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
// rollback a memory component
lsmAccessor.deleteComponents(memoryComponentsPredicate);
searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
@@ -204,6 +205,7 @@
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
+ dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
lsmAccessor.deleteComponents(pred);
searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
} catch (Throwable e) {
@@ -246,6 +248,7 @@
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
// rollback a memory component
lsmAccessor.deleteComponents(memoryComponentsPredicate);
searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
@@ -270,6 +273,7 @@
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
+ dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
lsmAccessor.deleteComponents(pred);
searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
} catch (Throwable e) {
@@ -316,6 +320,7 @@
firstSearcher.waitUntilEntered();
// now that we enetered, we will rollback
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
// rollback a memory component
lsmAccessor.deleteComponents(
c -> (c instanceof ILSMMemoryComponent && ((ILSMMemoryComponent) c).isModified()));
@@ -335,6 +340,7 @@
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
+ dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
lsmAccessor.deleteComponents(pred);
// now that the rollback has completed, we will unblock the search
lsmBtree.addSearchCallback(sem -> sem.release());
@@ -731,7 +737,7 @@
}
private class Rollerback {
- private Thread task;
+ private final Thread task;
private Exception failure;
public Rollerback(TestLsmBtree lsmBtree, Predicate<ILSMComponent> predicate) {
@@ -740,6 +746,7 @@
@Override
public void run() {
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
try {
lsmAccessor.deleteComponents(predicate);
} catch (HyracksDataException e) {
@@ -760,7 +767,7 @@
}
private class Searcher {
- private ExecutorService executor = Executors.newSingleThreadExecutor();
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
private Future<Boolean> task;
private volatile boolean entered = false;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
index 893b428..e0502de 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
@@ -21,6 +21,7 @@
import java.util.Map;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory;
import org.apache.asterix.metadata.IDatasetDetails;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
@@ -62,6 +63,6 @@
@Override
public ILSMIOOperationCallbackFactory getIoOperationCallbackFactory(Index index) throws AlgebricksException {
- return TestLsmBtreeIoOpCallbackFactory.INSTANCE;
+ return new TestLsmBtreeIoOpCallbackFactory(new DatasetLSMComponentIdGeneratorFactory(getDatasetId()));
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
index 142bcc5..fa37c20 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
@@ -18,19 +18,20 @@
*/
package org.apache.asterix.test.dataflow;
+import org.apache.asterix.common.ioopcallbacks.AbstractLSMIndexIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.EmptyComponent;
-public class TestLsmBtreeIoOpCallbackFactory implements ILSMIOOperationCallbackFactory {
+public class TestLsmBtreeIoOpCallbackFactory extends AbstractLSMIndexIOOperationCallbackFactory {
private static final long serialVersionUID = 1L;
- public static TestLsmBtreeIoOpCallbackFactory INSTANCE = new TestLsmBtreeIoOpCallbackFactory();
private static volatile int completedFlushes = 0;
private static volatile int completedMerges = 0;
private static volatile int rollbackFlushes = 0;
@@ -38,7 +39,8 @@
private static volatile int failedFlushes = 0;
private static volatile int failedMerges = 0;
- private TestLsmBtreeIoOpCallbackFactory() {
+ public TestLsmBtreeIoOpCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) {
+ super(idGeneratorFactory);
}
@Override
@@ -50,7 +52,7 @@
// Whenever this is called, it resets the counter
// However, the counters for the failed operations are never reset since we expect them
// To be always 0
- return new TestLsmBtreeIoOpCallback(index);
+ return new TestLsmBtreeIoOpCallback(index, getComponentIdGenerator());
}
public int getTotalFlushes() {
@@ -90,14 +92,14 @@
}
public class TestLsmBtreeIoOpCallback extends LSMBTreeIOOperationCallback {
- public TestLsmBtreeIoOpCallback(ILSMIndex index) {
- super(index);
+ public TestLsmBtreeIoOpCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
+ super(index, idGenerator);
}
@Override
public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) {
super.afterFinalize(opType, newComponent);
- synchronized (INSTANCE) {
+ synchronized (TestLsmBtreeIoOpCallbackFactory.this) {
if (newComponent != null) {
if (newComponent == EmptyComponent.INSTANCE) {
if (opType == LSMIOOperationType.FLUSH) {
@@ -115,7 +117,7 @@
} else {
recordFailure(opType);
}
- INSTANCE.notifyAll();
+ TestLsmBtreeIoOpCallbackFactory.this.notifyAll();
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index a32d4dc..41c5ade 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -25,6 +25,7 @@
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IResourceLifecycleManager;
@@ -79,6 +80,14 @@
PrimaryIndexOperationTracker getOperationTracker(int datasetId);
/**
+ * creates (if necessary) and returns the component Id generator of a dataset.
+ *
+ * @param datasetId
+ * @return
+ */
+ ILSMComponentIdGenerator getComponentIdGenerator(int datasetId);
+
+ /**
* creates (if necessary) and returns the dataset virtual buffer caches.
*
* @param datasetId
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index b877cfb..e5fc998 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -29,11 +29,14 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId.IdCompareResult;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy {
@@ -86,27 +89,26 @@
//nothing to merge
return false;
}
- long minID = immutableComponents.get(mergeableIndexes.getLeft()).getComponentId().getMinId();
- long maxID = immutableComponents.get(mergeableIndexes.getRight()).getComponentId().getMaxId();
-
+ ILSMComponent leftComponent = immutableComponents.get(mergeableIndexes.getLeft());
+ ILSMComponent rightComponent = immutableComponents.get(mergeableIndexes.getRight());
+ ILSMComponentId targetId = LSMComponentIdUtils.union(leftComponent.getId(), rightComponent.getId());
Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos();
int partition = getIndexPartition(index, indexInfos);
- triggerScheduledMerge(minID, maxID,
+ triggerScheduledMerge(targetId,
indexInfos.stream().filter(info -> info.getPartition() == partition).collect(Collectors.toSet()));
return true;
}
/**
- * Submit merge requests for all disk components within [minID, maxID]
+ * Submit merge requests for all disk components within the range specified by targetId
* of all indexes of a given dataset in the given partition
*
- * @param minID
- * @param maxID
- * @param partition
+ * @param targetId
* @param indexInfos
* @throws HyracksDataException
*/
- private void triggerScheduledMerge(long minID, long maxID, Set<IndexInfo> indexInfos) throws HyracksDataException {
+ private void triggerScheduledMerge(ILSMComponentId targetId, Set<IndexInfo> indexInfos)
+ throws HyracksDataException {
for (IndexInfo info : indexInfos) {
ILSMIndex lsmIndex = info.getIndex();
@@ -116,13 +118,13 @@
}
List<ILSMDiskComponent> mergableComponents = new ArrayList<>();
for (ILSMDiskComponent component : immutableComponents) {
- ILSMDiskComponentId id = component.getComponentId();
- if (id.getMinId() >= minID && id.getMaxId() <= maxID) {
+ ILSMComponentId id = component.getId();
+ IdCompareResult cmp = targetId.compareTo(id);
+ if (cmp == IdCompareResult.INCLUDE) {
mergableComponents.add(component);
- }
- if (id.getMaxId() < minID) {
+ } else if (cmp == IdCompareResult.GREATER_THAN) {
//disk components are ordered from latest (with largest IDs) to oldest (with smallest IDs)
- //if the component.maxID < minID, we can safely skip the rest disk components in the list
+ // if targetId>component.Id, we can safely skip the rest disk components in the list
break;
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
new file mode 100644
index 0000000..7b8397c
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.context;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
+
+/**
+ * This factory implementation is used by AsterixDB layer so that indexes of a dataset (/partition)
+ * use the same Id generator. This guarantees their memory components would receive the same Id upon
+ * activation.
+ *
+ */
+public class DatasetLSMComponentIdGeneratorFactory implements ILSMComponentIdGeneratorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private final int datasetId;
+
+ public DatasetLSMComponentIdGeneratorFactory(int datasetId) {
+ this.datasetId = datasetId;
+ }
+
+ @Override
+ public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) {
+ IDatasetLifecycleManager dslcManager =
+ ((INcApplicationContext) serviceCtx.getApplicationContext()).getDatasetLifecycleManager();
+ return dslcManager.getComponentIdGenerator(datasetId);
+ }
+
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 6282509..1e99fb5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -43,11 +43,13 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.LocalResource;
@@ -233,10 +235,12 @@
dsr = datasets.get(did);
if (dsr == null) {
DatasetInfo dsInfo = new DatasetInfo(did);
- PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(did, logManager, dsInfo);
+ ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator();
+ PrimaryIndexOperationTracker opTracker =
+ new PrimaryIndexOperationTracker(did, logManager, dsInfo, idGenerator);
DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties,
memoryManager.getNumPages(did), numPartitions);
- dsr = new DatasetResource(dsInfo, opTracker, vbcs);
+ dsr = new DatasetResource(dsInfo, opTracker, vbcs, idGenerator);
datasets.put(did, dsr);
}
return dsr;
@@ -319,6 +323,11 @@
return datasets.get(datasetId).getOpTracker();
}
+ @Override
+ public ILSMComponentIdGenerator getComponentIdGenerator(int datasetId) {
+ return datasets.get(datasetId).getIdGenerator();
+ }
+
private void validateDatasetLifecycleManagerState() throws HyracksDataException {
if (stopped) {
throw new HyracksDataException(DatasetLifecycleManager.class.getSimpleName() + " was stopped.");
@@ -404,6 +413,9 @@
}
}
+ ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID());
+ idGenerator.refresh();
+
if (asyncFlush) {
for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 79ae1da..f6e2b0d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -23,6 +23,7 @@
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.common.LocalResource;
@@ -42,12 +43,15 @@
private final DatasetInfo datasetInfo;
private final PrimaryIndexOperationTracker datasetPrimaryOpTracker;
private final DatasetVirtualBufferCaches datasetVirtualBufferCaches;
+ private final ILSMComponentIdGenerator datasetComponentIdGenerator;
public DatasetResource(DatasetInfo datasetInfo, PrimaryIndexOperationTracker datasetPrimaryOpTracker,
- DatasetVirtualBufferCaches datasetVirtualBufferCaches) {
+ DatasetVirtualBufferCaches datasetVirtualBufferCaches,
+ ILSMComponentIdGenerator datasetComponentIdGenerator) {
this.datasetInfo = datasetInfo;
this.datasetPrimaryOpTracker = datasetPrimaryOpTracker;
this.datasetVirtualBufferCaches = datasetVirtualBufferCaches;
+ this.datasetComponentIdGenerator = datasetComponentIdGenerator;
}
public boolean isRegistered() {
@@ -116,6 +120,10 @@
return datasetPrimaryOpTracker;
}
+ public ILSMComponentIdGenerator getIdGenerator() {
+ return datasetComponentIdGenerator;
+ }
+
@Override
public int compareTo(DatasetResource o) {
return datasetInfo.compareTo(o.datasetInfo);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 01e33a7..6f35a3d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -32,6 +32,7 @@
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
@@ -45,13 +46,16 @@
// Number of active operations on an ILSMIndex instance.
private final AtomicInteger numActiveOperations;
private final ILogManager logManager;
+ private final ILSMComponentIdGenerator idGenerator;
private boolean flushOnExit = false;
private boolean flushLogCreated = false;
- public PrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo) {
+ public PrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo,
+ ILSMComponentIdGenerator idGenerator) {
super(datasetID, dsInfo);
this.logManager = logManager;
this.numActiveOperations = new AtomicInteger();
+ this.idGenerator = idGenerator;
}
@Override
@@ -142,6 +146,7 @@
//This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled.
public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
+ idGenerator.refresh();
for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
//get resource
ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index 68f42e7..e445fe4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -20,27 +20,25 @@
package org.apache.asterix.common.ioopcallbacks;
import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.primitive.LongPointable;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
-import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentId;
import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
// A single LSMIOOperationCallback per LSM index used to perform actions around Flush and Merge operations
public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback {
- private static final Logger LOGGER = Logger.getLogger(AbstractLSMIOOperationCallback.class.getName());
public static final MutableArrayValueReference LSN_KEY = new MutableArrayValueReference("LSN".getBytes());
public static final long INVALID = -1L;
@@ -56,8 +54,11 @@
// Index of the currently being written to component
protected int writeIndex;
- public AbstractLSMIOOperationCallback(ILSMIndex lsmIndex) {
+ protected final ILSMComponentIdGenerator idGenerator;
+
+ public AbstractLSMIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator) {
this.lsmIndex = lsmIndex;
+ this.idGenerator = idGenerator;
int count = lsmIndex.getNumberOfAllMemoryComponents();
mutableLastLSNs = new long[count];
firstLSNs = new long[count];
@@ -114,40 +115,22 @@
return pointable.getLength() == 0 ? INVALID : pointable.longValue();
}
- private ILSMDiskComponentId getComponentId(List<ILSMComponent> oldComponents) throws HyracksDataException {
- if (oldComponents == null) {
- //if oldComponents == null, then getComponentLSN would treat it as a flush operation,
- //and return the LSN for the flushed component
- long id = getComponentLSN(null);
- if (id == 0) {
- LOGGER.log(Level.WARNING, "Flushing a memory component without setting the LSN");
- id = ILSMDiskComponentId.NOT_FOUND;
- }
- return new LSMDiskComponentId(id, id);
- } else {
- long minId = Long.MAX_VALUE;
- long maxId = Long.MIN_VALUE;
- for (ILSMComponent oldComponent : oldComponents) {
- ILSMDiskComponentId oldComponentId = ((ILSMDiskComponent) oldComponent).getComponentId();
- if (oldComponentId.getMinId() < minId) {
- minId = oldComponentId.getMinId();
- }
- if (oldComponentId.getMaxId() > maxId) {
- maxId = oldComponentId.getMaxId();
- }
- }
- return new LSMDiskComponentId(minId, maxId);
+ private ILSMComponentId getMergedComponentId(List<ILSMComponent> mergedComponents) throws HyracksDataException {
+ if (mergedComponents == null || mergedComponents.isEmpty()) {
+ return null;
}
+ return LSMComponentIdUtils.union(mergedComponents.get(0).getId(),
+ mergedComponents.get(mergedComponents.size() - 1).getId());
+
}
- private void putComponentIdIntoMetadata(ILSMDiskComponent component, List<ILSMComponent> oldComponents)
- throws HyracksDataException {
- DiskComponentMetadata metadata = component.getMetadata();
- ILSMDiskComponentId componentId = getComponentId(oldComponents);
- metadata.put(ILSMDiskComponentId.COMPONENT_ID_MIN_KEY,
- LongPointable.FACTORY.createPointable(componentId.getMinId()));
- metadata.put(ILSMDiskComponentId.COMPONENT_ID_MAX_KEY,
- LongPointable.FACTORY.createPointable(componentId.getMaxId()));
+ private void putComponentIdIntoMetadata(LSMIOOperationType opType, ILSMDiskComponent newComponent,
+ List<ILSMComponent> oldComponents) throws HyracksDataException {
+ // the id of flushed component is set when we copy the metadata of the memory component
+ if (opType == LSMIOOperationType.MERGE) {
+ ILSMComponentId componentId = getMergedComponentId(oldComponents);
+ LSMComponentIdUtils.persist(componentId, newComponent.getMetadata());
+ }
}
public synchronized void updateLastLSN(long lastLSN) {
@@ -188,7 +171,7 @@
//TODO: Copying Filters and all content of the metadata pages for flush operation should be done here
if (newComponent != null) {
putLSNIntoMetadata(newComponent, oldComponents);
- putComponentIdIntoMetadata(newComponent, oldComponents);
+ putComponentIdIntoMetadata(opType, newComponent, oldComponents);
if (opType == LSMIOOperationType.MERGE) {
// In case of merge, oldComponents are never null
LongPointable markerLsn =
@@ -196,7 +179,6 @@
ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND));
newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn);
}
-
}
}
@@ -220,12 +202,12 @@
@Override
public void recycled(ILSMMemoryComponent component) throws HyracksDataException {
- // No op
+ component.resetId(idGenerator.getId());
}
@Override
public void allocated(ILSMMemoryComponent component) throws HyracksDataException {
- // No op
+ component.resetId(idGenerator.getId());
}
/**
@@ -237,4 +219,5 @@
*/
public abstract long getComponentFileLSNOffset(ILSMDiskComponent component, String componentFilePath)
throws HyracksDataException;
+
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
new file mode 100644
index 0000000..16447fd
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.ioopcallbacks;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+
+public abstract class AbstractLSMIndexIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ protected final ILSMComponentIdGeneratorFactory idGeneratorFactory;
+
+ protected transient INCServiceContext ncCtx;
+
+ public AbstractLSMIndexIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) {
+ this.idGeneratorFactory = idGeneratorFactory;
+ }
+
+ @Override
+ public void initialize(INCServiceContext ncCtx) {
+ this.ncCtx = ncCtx;
+ }
+
+ protected ILSMComponentIdGenerator getComponentIdGenerator() {
+ assert ncCtx != null;
+ return idGeneratorFactory.getComponentIdGenerator(ncCtx);
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index c7fbb65..c1ee03b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -23,13 +23,14 @@
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
- public LSMBTreeIOOperationCallback(ILSMIndex index) {
- super(index);
+ public LSMBTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
+ super(index, idGenerator);
}
@Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index e3abb6b..4ef12ef 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -19,21 +19,20 @@
package org.apache.asterix.common.ioopcallbacks;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-public class LSMBTreeIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
+public class LSMBTreeIOOperationCallbackFactory extends AbstractLSMIndexIOOperationCallbackFactory {
private static final long serialVersionUID = 1L;
- public static LSMBTreeIOOperationCallbackFactory INSTANCE = new LSMBTreeIOOperationCallbackFactory();
-
- private LSMBTreeIOOperationCallbackFactory() {
+ public LSMBTreeIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) {
+ super(idGeneratorFactory);
}
@Override
public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
- return new LSMBTreeIOOperationCallback(index);
+ return new LSMBTreeIOOperationCallback(index, getComponentIdGenerator());
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
index 67d623a..b43fb2f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
@@ -22,13 +22,14 @@
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyFileManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
public class LSMBTreeWithBuddyIOOperationCallback extends AbstractLSMIOOperationCallback {
- public LSMBTreeWithBuddyIOOperationCallback(ILSMIndex lsmIndex) {
- super(lsmIndex);
+ public LSMBTreeWithBuddyIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator) {
+ super(lsmIndex, idGenerator);
}
@Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
index 93f505c..6727bf6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
@@ -18,22 +18,20 @@
*/
package org.apache.asterix.common.ioopcallbacks;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-public class LSMBTreeWithBuddyIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
+public class LSMBTreeWithBuddyIOOperationCallbackFactory extends AbstractLSMIndexIOOperationCallbackFactory {
private static final long serialVersionUID = 1L;
- public static final LSMBTreeWithBuddyIOOperationCallbackFactory INSTANCE =
- new LSMBTreeWithBuddyIOOperationCallbackFactory();
-
- private LSMBTreeWithBuddyIOOperationCallbackFactory() {
+ public LSMBTreeWithBuddyIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) {
+ super(idGeneratorFactory);
}
@Override
public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
- return new LSMBTreeWithBuddyIOOperationCallback(index);
+ return new LSMBTreeWithBuddyIOOperationCallback(index, getComponentIdGenerator());
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 2d27b78..015cd38 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -21,6 +21,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
@@ -28,8 +29,8 @@
public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
- public LSMInvertedIndexIOOperationCallback(ILSMIndex index) {
- super(index);
+ public LSMInvertedIndexIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
+ super(index, idGenerator);
}
@Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index 47a67b2..a2712d1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -19,22 +19,20 @@
package org.apache.asterix.common.ioopcallbacks;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-public class LSMInvertedIndexIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
+public class LSMInvertedIndexIOOperationCallbackFactory extends AbstractLSMIndexIOOperationCallbackFactory {
private static final long serialVersionUID = 1L;
- public static final LSMInvertedIndexIOOperationCallbackFactory INSTANCE =
- new LSMInvertedIndexIOOperationCallbackFactory();
-
- private LSMInvertedIndexIOOperationCallbackFactory() {
+ public LSMInvertedIndexIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) {
+ super(idGeneratorFactory);
}
@Override
public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
- return new LSMInvertedIndexIOOperationCallback(index);
+ return new LSMInvertedIndexIOOperationCallback(index, getComponentIdGenerator());
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index 9ba99f9..bc79074 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -21,6 +21,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeFileManager;
@@ -28,8 +29,8 @@
public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
- public LSMRTreeIOOperationCallback(ILSMIndex index) {
- super(index);
+ public LSMRTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
+ super(index, idGenerator);
}
@Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 14cf648..087aaae 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -19,21 +19,20 @@
package org.apache.asterix.common.ioopcallbacks;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-public class LSMRTreeIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
+public class LSMRTreeIOOperationCallbackFactory extends AbstractLSMIndexIOOperationCallbackFactory {
private static final long serialVersionUID = 1L;
- public static final LSMRTreeIOOperationCallbackFactory INSTANCE = new LSMRTreeIOOperationCallbackFactory();
-
- private LSMRTreeIOOperationCallbackFactory() {
+ public LSMRTreeIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) {
+ super(idGeneratorFactory);
}
@Override
public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
- return new LSMRTreeIOOperationCallback(index);
+ return new LSMRTreeIOOperationCallback(index, getComponentIdGenerator());
}
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
index 01f08db..2928d90 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -33,13 +33,13 @@
import org.apache.asterix.common.context.IndexInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentId;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.junit.Assert;
import org.junit.Test;
@@ -62,14 +62,13 @@
@Test
public void testBasic() {
try {
- List<ILSMDiskComponentId> componentIDs =
- Arrays.asList(new LSMDiskComponentId(5, 5), new LSMDiskComponentId(4, 4),
- new LSMDiskComponentId(3, 3), new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1));
+ List<ILSMComponentId> componentIDs = Arrays.asList(new LSMComponentId(5, 5), new LSMComponentId(4, 4),
+ new LSMComponentId(3, 3), new LSMComponentId(2, 2), new LSMComponentId(1, 1));
- List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+ List<ILSMComponentId> resultPrimaryIDs = new ArrayList<>();
IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0);
- List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+ List<ILSMComponentId> resultSecondaryIDs = new ArrayList<>();
IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0);
ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
@@ -79,10 +78,10 @@
policy.diskComponentAdded(primary.getIndex(), false);
- Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(4, 4), new LSMDiskComponentId(3, 3),
- new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1)), resultPrimaryIDs);
- Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(4, 4), new LSMDiskComponentId(3, 3),
- new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1)), resultSecondaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMComponentId(4, 4), new LSMComponentId(3, 3),
+ new LSMComponentId(2, 2), new LSMComponentId(1, 1)), resultPrimaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMComponentId(4, 4), new LSMComponentId(3, 3),
+ new LSMComponentId(2, 2), new LSMComponentId(1, 1)), resultSecondaryIDs);
} catch (HyracksDataException e) {
Assert.fail(e.getMessage());
@@ -93,14 +92,13 @@
@Test
public void testIDIntervals() {
try {
- List<ILSMDiskComponentId> componentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
- new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24),
- new LSMDiskComponentId(10, 19));
+ List<ILSMComponentId> componentIDs = Arrays.asList(new LSMComponentId(40, 50), new LSMComponentId(30, 35),
+ new LSMComponentId(25, 29), new LSMComponentId(20, 24), new LSMComponentId(10, 19));
- List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+ List<ILSMComponentId> resultPrimaryIDs = new ArrayList<>();
IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0);
- List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+ List<ILSMComponentId> resultSecondaryIDs = new ArrayList<>();
IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0);
ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
@@ -110,10 +108,10 @@
policy.diskComponentAdded(primary.getIndex(), false);
- Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
- new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs);
- Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
- new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultSecondaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29),
+ new LSMComponentId(20, 24), new LSMComponentId(10, 19)), resultPrimaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29),
+ new LSMComponentId(20, 24), new LSMComponentId(10, 19)), resultSecondaryIDs);
} catch (HyracksDataException e) {
Assert.fail(e.getMessage());
@@ -123,15 +121,15 @@
@Test
public void testSecondaryMissing() {
try {
- List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
- new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24),
- new LSMDiskComponentId(10, 19));
- List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+ List<ILSMComponentId> primaryComponentIDs =
+ Arrays.asList(new LSMComponentId(40, 50), new LSMComponentId(30, 35), new LSMComponentId(25, 29),
+ new LSMComponentId(20, 24), new LSMComponentId(10, 19));
+ List<ILSMComponentId> resultPrimaryIDs = new ArrayList<>();
IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0);
- List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35),
- new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24));
- List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+ List<ILSMComponentId> secondaryComponentIDs =
+ Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29), new LSMComponentId(20, 24));
+ List<ILSMComponentId> resultSecondaryIDs = new ArrayList<>();
IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0);
ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
@@ -141,10 +139,11 @@
Assert.assertTrue(resultSecondaryIDs.isEmpty());
policy.diskComponentAdded(primary.getIndex(), false);
- Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
- new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs);
- Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
- new LSMDiskComponentId(20, 24)), resultSecondaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29),
+ new LSMComponentId(20, 24), new LSMComponentId(10, 19)), resultPrimaryIDs);
+ Assert.assertEquals(
+ Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29), new LSMComponentId(20, 24)),
+ resultSecondaryIDs);
} catch (HyracksDataException e) {
Assert.fail(e.getMessage());
@@ -154,17 +153,16 @@
@Test
public void testMultiPartition() {
try {
- List<ILSMDiskComponentId> componentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
- new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24),
- new LSMDiskComponentId(10, 19));
+ List<ILSMComponentId> componentIDs = Arrays.asList(new LSMComponentId(40, 50), new LSMComponentId(30, 35),
+ new LSMComponentId(25, 29), new LSMComponentId(20, 24), new LSMComponentId(10, 19));
- List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+ List<ILSMComponentId> resultPrimaryIDs = new ArrayList<>();
IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0);
- List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+ List<ILSMComponentId> resultSecondaryIDs = new ArrayList<>();
IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0);
- List<ILSMDiskComponentId> resultSecondaryIDs1 = new ArrayList<>();
+ List<ILSMComponentId> resultSecondaryIDs1 = new ArrayList<>();
IndexInfo secondary1 = mockIndex(false, componentIDs, resultSecondaryIDs, 1);
ILSMMergePolicy policy = mockMergePolicy(primary, secondary, secondary1);
@@ -174,10 +172,10 @@
policy.diskComponentAdded(primary.getIndex(), false);
- Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
- new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs);
- Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
- new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultSecondaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29),
+ new LSMComponentId(20, 24), new LSMComponentId(10, 19)), resultPrimaryIDs);
+ Assert.assertEquals(Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29),
+ new LSMComponentId(20, 24), new LSMComponentId(10, 19)), resultSecondaryIDs);
Assert.assertTrue(resultSecondaryIDs1.isEmpty());
} catch (HyracksDataException e) {
Assert.fail(e.getMessage());
@@ -205,12 +203,12 @@
return policy;
}
- private IndexInfo mockIndex(boolean isPrimary, List<ILSMDiskComponentId> componentIDs,
- List<ILSMDiskComponentId> resultComponentIDs, int partition) throws HyracksDataException {
+ private IndexInfo mockIndex(boolean isPrimary, List<ILSMComponentId> componentIDs,
+ List<ILSMComponentId> resultComponentIDs, int partition) throws HyracksDataException {
List<ILSMDiskComponent> components = new ArrayList<>();
- for (ILSMDiskComponentId id : componentIDs) {
+ for (ILSMComponentId id : componentIDs) {
ILSMDiskComponent component = Mockito.mock(ILSMDiskComponent.class);
- Mockito.when(component.getComponentId()).thenReturn(id);
+ Mockito.when(component.getId()).thenReturn(id);
Mockito.when(component.getComponentSize()).thenReturn(DEFAULT_COMPONENT_SIZE);
Mockito.when(component.getState()).thenReturn(ComponentState.READABLE_UNWRITABLE);
components.add(component);
@@ -227,7 +225,7 @@
List<ILSMDiskComponent> mergedComponents = invocation.getArgumentAt(1, List.class);
mergedComponents.forEach(component -> {
try {
- resultComponentIDs.add(component.getComponentId());
+ resultComponentIDs.add(component.getId());
} catch (HyracksDataException e) {
e.printStackTrace();
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
index d48227f..f467ee8 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
import org.junit.Assert;
import org.mockito.Mockito;
@@ -34,7 +35,8 @@
try {
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex);
+ LSMBTreeIOOperationCallback callback =
+ new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
//request to flush first component
callback.updateLastLSN(1);
@@ -58,7 +60,8 @@
try {
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex);
+ LSMBTreeIOOperationCallback callback =
+ new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
//request to flush first component
callback.updateLastLSN(1);
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
index 94ef0a3..63c46f7 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
import org.junit.Assert;
import org.mockito.Mockito;
@@ -34,7 +35,8 @@
try {
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- LSMBTreeWithBuddyIOOperationCallback callback = new LSMBTreeWithBuddyIOOperationCallback(mockIndex);
+ LSMBTreeWithBuddyIOOperationCallback callback =
+ new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
//request to flush first component
callback.updateLastLSN(1);
@@ -58,7 +60,8 @@
try {
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- LSMBTreeWithBuddyIOOperationCallback callback = new LSMBTreeWithBuddyIOOperationCallback(mockIndex);
+ LSMBTreeWithBuddyIOOperationCallback callback =
+ new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
//request to flush first component
callback.updateLastLSN(1);
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
index b213da0..1e961d8 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
import org.junit.Assert;
import org.mockito.Mockito;
@@ -34,7 +35,8 @@
try {
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- LSMInvertedIndexIOOperationCallback callback = new LSMInvertedIndexIOOperationCallback(mockIndex);
+ LSMInvertedIndexIOOperationCallback callback =
+ new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
//request to flush first component
callback.updateLastLSN(1);
@@ -58,7 +60,8 @@
try {
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- LSMInvertedIndexIOOperationCallback callback = new LSMInvertedIndexIOOperationCallback(mockIndex);
+ LSMInvertedIndexIOOperationCallback callback =
+ new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
//request to flush first component
callback.updateLastLSN(1);
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
index df26ef9..618f2a3 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
import org.junit.Assert;
import org.mockito.Mockito;
@@ -34,7 +35,8 @@
try {
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- LSMRTreeIOOperationCallback callback = new LSMRTreeIOOperationCallback(mockIndex);
+ LSMRTreeIOOperationCallback callback =
+ new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
//request to flush first component
callback.updateLastLSN(1);
@@ -58,7 +60,8 @@
try {
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- LSMRTreeIOOperationCallback callback = new LSMRTreeIOOperationCallback(mockIndex);
+ LSMRTreeIOOperationCallback callback =
+ new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
//request to flush first component
callback.updateLastLSN(1);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index a6f1ad0..d2622c4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -33,6 +33,7 @@
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.config.MetadataProperties;
import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory;
import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.exceptions.ACIDException;
@@ -79,6 +80,7 @@
import org.apache.hyracks.storage.am.common.build.IndexBuilder;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeLocalResourceFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
@@ -324,7 +326,10 @@
ILSMOperationTrackerFactory opTrackerFactory =
index.isPrimaryIndex() ? new PrimaryIndexOperationTrackerFactory(datasetId)
: new SecondaryIndexOperationTrackerFactory(datasetId);
- ILSMIOOperationCallbackFactory ioOpCallbackFactory = LSMBTreeIOOperationCallbackFactory.INSTANCE;
+ ILSMComponentIdGeneratorFactory idGeneratorProvider =
+ new DatasetLSMComponentIdGeneratorFactory(index.getDatasetId().getId());
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory =
+ new LSMBTreeIOOperationCallbackFactory(idGeneratorProvider);
IStorageComponentProvider storageComponentProvider = appContext.getStorageComponentProvider();
if (isNewUniverse()) {
LSMBTreeLocalResourceFactory lsmBtreeFactory = new LSMBTreeLocalResourceFactory(
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 2f94ad74..fb9901d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -87,6 +87,8 @@
import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
import org.apache.asterix.runtime.formats.FormatUtils;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -133,7 +135,6 @@
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
@@ -595,9 +596,9 @@
// bulkload?)
IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
- TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad =
- new TreeIndexBulkLoadOperatorDescriptor(spec, null, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true, indexHelperFactory);
+ LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new LSMIndexBulkLoadOperatorDescriptor(spec, null,
+ fieldPermutation, GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true,
+ indexHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId());
return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
@@ -1001,8 +1002,9 @@
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh);
+ op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
+ GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh, null,
+ BulkLoadUsage.LOAD, dataset.getDatasetId());
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
null, true, modificationCallbackFactory);
@@ -1135,8 +1137,9 @@
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
+ op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
+ GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, null,
+ BulkLoadUsage.LOAD, dataset.getDatasetId());
} else if (indexOp == IndexOperation.UPSERT) {
op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh,
filterFactory, modificationCallbackFactory, prevFieldPermutation);
@@ -1237,9 +1240,9 @@
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
+ op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false,
- indexDataflowHelperFactory);
+ indexDataflowHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId());
} else if (indexOp == IndexOperation.UPSERT) {
op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation,
indexDataflowHelperFactory, filterFactory, modificationCallbackFactory, prevFieldPermutation);
@@ -1353,8 +1356,9 @@
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory);
+ op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
+ GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory,
+ null, BulkLoadUsage.LOAD, dataset.getDatasetId());
} else if (indexOp == IndexOperation.UPSERT) {
op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory,
filterFactory, modificationCallbackFactory, prevFieldPermutation);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 3fec73b..e5f97f0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -30,6 +30,7 @@
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.active.IActiveNotificationHandler;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory;
import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
@@ -107,6 +108,7 @@
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
@@ -505,15 +507,15 @@
case BTREE:
return getDatasetType() == DatasetType.EXTERNAL
&& !index.getIndexName().equals(IndexingConstants.getFilesIndexName(getDatasetName()))
- ? LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE
- : LSMBTreeIOOperationCallbackFactory.INSTANCE;
+ ? new LSMBTreeWithBuddyIOOperationCallbackFactory(getComponentIdGeneratorFactory())
+ : new LSMBTreeIOOperationCallbackFactory(getComponentIdGeneratorFactory());
case RTREE:
- return LSMRTreeIOOperationCallbackFactory.INSTANCE;
+ return new LSMRTreeIOOperationCallbackFactory(getComponentIdGeneratorFactory());
case LENGTH_PARTITIONED_NGRAM_INVIX:
case LENGTH_PARTITIONED_WORD_INVIX:
case SINGLE_PARTITION_NGRAM_INVIX:
case SINGLE_PARTITION_WORD_INVIX:
- return LSMInvertedIndexIOOperationCallbackFactory.INSTANCE;
+ return new LSMInvertedIndexIOOperationCallbackFactory(getComponentIdGeneratorFactory());
default:
throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE,
index.getIndexType().toString());
@@ -532,6 +534,10 @@
: new SecondaryIndexOperationTrackerFactory(getDatasetId());
}
+ public ILSMComponentIdGeneratorFactory getComponentIdGeneratorFactory() {
+ return new DatasetLSMComponentIdGeneratorFactory(getDatasetId());
+ }
+
/**
* Get search callback factory for this dataset with the passed index and operation
*
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 5dac407..7701f65 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -19,8 +19,6 @@
package org.apache.asterix.metadata.utils;
-import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
-
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -51,6 +49,8 @@
import org.apache.asterix.runtime.evaluators.functions.AndDescriptor;
import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor;
import org.apache.asterix.runtime.evaluators.functions.NotDescriptor;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -71,6 +71,8 @@
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
@@ -124,8 +126,8 @@
this.index = index;
this.physOptConf = physOptConf;
this.metadataProvider = metadataProvider;
- this.itemType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(),
- dataset.getItemTypeName());
+ this.itemType =
+ (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
this.metaType = DatasetUtil.getMetaType(metadataProvider, dataset);
Pair<ARecordType, ARecordType> enforcedTypes = getEnforcedType(index, itemType, metaType);
this.enforcedItemType = enforcedTypes.first;
@@ -341,11 +343,15 @@
return sortOp;
}
- protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
+ protected LSMIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
throws AlgebricksException {
- TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
- secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory);
+ IndexDataflowHelperFactory primaryIndexDataflowHelperFactory = new IndexDataflowHelperFactory(
+ metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider);
+
+ LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new LSMIndexBulkLoadOperatorDescriptor(spec,
+ secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory,
+ primaryIndexDataflowHelperFactory, BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId());
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
secondaryPartitionConstraint);
return treeIndexBulkLoadOp;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
new file mode 100644
index 0000000..74590c7
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.operators;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+
+public class LSMIndexBulkLoadOperatorDescriptor extends TreeIndexBulkLoadOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ public enum BulkLoadUsage {
+ LOAD,
+ CREATE_INDEX
+ }
+
+ protected final IIndexDataflowHelperFactory primaryIndexHelperFactory;
+
+ protected final BulkLoadUsage usage;
+
+ protected final int datasetId;
+
+ public LSMIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
+ int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex, IIndexDataflowHelperFactory indexHelperFactory,
+ IIndexDataflowHelperFactory primaryIndexHelperFactory, BulkLoadUsage usage, int datasetId) {
+ super(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+ indexHelperFactory);
+ this.primaryIndexHelperFactory = primaryIndexHelperFactory;
+ this.usage = usage;
+ this.datasetId = datasetId;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new LSMIndexBulkLoadOperatorNodePushable(indexHelperFactory, primaryIndexHelperFactory, ctx, partition,
+ fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+ recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), usage, datasetId);
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
new file mode 100644
index 0000000..2415556
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import java.util.List;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexDiskComponentBulkLoader;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
+
+public class LSMIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorNodePushable {
+ protected final BulkLoadUsage usage;
+
+ protected final IIndexDataflowHelper primaryIndexHelper;
+ protected final IDatasetLifecycleManager datasetManager;
+ protected final int datasetId;
+ protected final int partition;
+
+ protected ILSMIndex primaryIndex;
+
+ public LSMIndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory indexDataflowHelperFactory,
+ IIndexDataflowHelperFactory priamryIndexDataflowHelperFactory, IHyracksTaskContext ctx, int partition,
+ int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex, RecordDescriptor recDesc, BulkLoadUsage usage, int datasetId)
+ throws HyracksDataException {
+ super(indexDataflowHelperFactory, ctx, partition, fieldPermutation, fillFactor, verifyInput, numElementsHint,
+ checkIfEmptyIndex, recDesc);
+
+ if (priamryIndexDataflowHelperFactory != null) {
+ this.primaryIndexHelper =
+ priamryIndexDataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+ } else {
+ this.primaryIndexHelper = null;
+ }
+ this.usage = usage;
+ this.datasetId = datasetId;
+ this.partition = partition;
+ INcApplicationContext ncCtx =
+ (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
+ datasetManager = ncCtx.getDatasetLifecycleManager();
+ }
+
+ @Override
+ protected void initializeBulkLoader() throws HyracksDataException {
+ ILSMIndex targetIndex = (ILSMIndex) index;
+ if (usage.equals(BulkLoadUsage.LOAD)) {
+ // for a loaded dataset, we use the default Id 0 which is guaranteed to be smaller
+ // than Ids of all memory components
+
+ // TODO handle component Id for datasets loaded multiple times
+ // TODO move this piece of code to io operation callback
+ bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+ ILSMDiskComponent diskComponent = ((LSMIndexDiskComponentBulkLoader) bulkLoader).getComponent();
+ LSMComponentIdUtils.persist(LSMComponentId.DEFAULT_COMPONENT_ID, diskComponent.getMetadata());
+ } else {
+ primaryIndexHelper.open();
+ primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance();
+ List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
+ bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+ if (!primaryComponents.isEmpty()) {
+ // TODO move this piece of code to io operation callback
+ // Ideally, this should be done in io operation callback when a bulk load operation is finished
+ // However, currently we don't have an extensible callback mechanism to support this
+ ILSMComponentId bulkloadId = LSMComponentIdUtils.union(primaryComponents.get(0).getId(),
+ primaryComponents.get(primaryComponents.size() - 1).getId());
+ ILSMDiskComponent diskComponent = ((LSMIndexDiskComponentBulkLoader) bulkLoader).getComponent();
+ LSMComponentIdUtils.persist(bulkloadId, diskComponent.getMetadata());
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ super.close();
+ } finally {
+ if (primaryIndex != null) {
+ primaryIndexHelper.close();
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
index cee20ce..6d9ec47 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -19,24 +19,20 @@
package org.apache.asterix.runtime.operators;
import java.nio.ByteBuffer;
-import java.util.List;
import org.apache.asterix.runtime.operators.LSMSecondaryIndexCreationTupleProcessorNodePushable.DeletedTupleCounter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.primitive.LongPointable;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexDiskComponentBulkLoader;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
/**
* This operator node is used to bulk load incoming tuples (scanned from the primary index)
@@ -56,12 +52,9 @@
private ILSMIndex primaryIndex;
private ILSMIndex secondaryIndex;
- private ILSMDiskComponent component;
- private ILSMDiskComponentBulkLoader componentBulkLoader;
+ private LSMIndexDiskComponentBulkLoader componentBulkLoader;
private int currentComponentPos = -1;
- private ILSMDiskComponent[] diskComponents;
-
public LSMSecondaryIndexBulkLoadNodePushable(IHyracksTaskContext ctx, int partition, RecordDescriptor inputRecDesc,
IIndexDataflowHelperFactory primaryIndexHelperFactory,
IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] fieldPermutation, int numTagFields,
@@ -92,7 +85,6 @@
super.open();
primaryIndexHelper.open();
primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance();
- diskComponents = new ILSMDiskComponent[primaryIndex.getDiskComponents().size()];
secondaryIndexHelper.open();
secondaryIndex = (ILSMIndex) secondaryIndexHelper.getIndexInstance();
@@ -107,8 +99,6 @@
closeException = e;
}
- activateComponents();
-
try {
if (primaryIndexHelper != null) {
primaryIndexHelper.close();
@@ -184,24 +174,22 @@
}
private void endCurrentComponent() throws HyracksDataException {
- if (component != null) {
- // set disk component id
-
+ if (componentBulkLoader != null) {
componentBulkLoader.end();
- diskComponents[currentComponentPos] = component;
-
componentBulkLoader = null;
- component = null;
}
}
private void loadNewComponent(int componentPos) throws HyracksDataException {
endCurrentComponent();
- component = secondaryIndex.createBulkLoadTarget();
int numTuples = getNumDeletedTuples(componentPos);
- componentBulkLoader = component.createBulkLoader(1.0f, false, numTuples, false, true, true);
-
+ ILSMDiskComponent primaryComponent = primaryIndex.getDiskComponents().get(componentPos);
+ componentBulkLoader =
+ (LSMIndexDiskComponentBulkLoader) secondaryIndex.createBulkLoader(1.0f, false, numTuples, false);
+ ILSMDiskComponent diskComponent = componentBulkLoader.getComponent();
+ // TODO move this piece of code to io operation callback
+ LSMComponentIdUtils.persist(primaryComponent.getId(), diskComponent.getMetadata());
}
private void addAntiMatterTuple(ITupleReference tuple) throws HyracksDataException {
@@ -220,30 +208,6 @@
}
- private void activateComponents() throws HyracksDataException {
- List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
- for (int i = diskComponents.length - 1; i >= 0; i--) {
- // start from the oldest component to the newest component
- if (diskComponents[i] != null && diskComponents[i].getComponentSize() > 0) {
- secondaryIndex.getIOOperationCallback().afterOperation(LSMIOOperationType.FLUSH, null,
- diskComponents[i]);
-
- // setting component id has to be place between afterOperation and addBulkLoadedComponent,
- // since afterOperation would set a flush component id (but it's not invalid)
- // and addBulkLoadedComponent would finalize the component
- ILSMDiskComponentId primaryComponentId = primaryComponents.get(i).getComponentId();
- //set component id
- diskComponents[i].getMetadata().put(ILSMDiskComponentId.COMPONENT_ID_MIN_KEY,
- LongPointable.FACTORY.createPointable(primaryComponentId.getMinId()));
- diskComponents[i].getMetadata().put(ILSMDiskComponentId.COMPONENT_ID_MAX_KEY,
- LongPointable.FACTORY.createPointable(primaryComponentId.getMaxId()));
-
- ((AbstractLSMIndex) secondaryIndex).getLsmHarness().addBulkLoadedComponent(diskComponents[i]);
-
- }
- }
- }
-
private int getNumDeletedTuples(int componentPos) {
DeletedTupleCounter counter = (DeletedTupleCounter) ctx.getStateObject(partition);
return counter.get(componentPos);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 5fc07ad..095159b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -65,7 +65,7 @@
index = indexHelper.getIndexInstance();
try {
writer.open();
- bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+ initializeBulkLoader();
} catch (Exception e) {
throw HyracksDataException.create(e);
}
@@ -116,4 +116,8 @@
writer.fail();
}
}
+
+ protected void initializeBulkLoader() throws HyracksDataException {
+ bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+ }
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
index 6083637..a859f68 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
@@ -57,6 +57,7 @@
public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException {
IIOManager ioManager = serviceCtx.getIoManager();
FileReference file = ioManager.resolve(path);
+ ioOpCallbackFactory.initialize(serviceCtx);
return LSMBTreeUtil.createExternalBTree(ioManager, file, storageManager.getBufferCache(serviceCtx), typeTraits,
cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
index 04b63f9..9422253 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
@@ -60,6 +60,7 @@
public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException {
IIOManager ioManager = serviceCtx.getIoManager();
FileReference file = ioManager.resolve(path);
+ ioOpCallbackFactory.initialize(serviceCtx);
return LSMBTreeUtil.createExternalBTreeWithBuddy(ioManager, file, storageManager.getBufferCache(serviceCtx),
typeTraits, cmpFactories, bloomFilterFalsePositiveRate,
mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
index dfa88da..1988736 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
@@ -70,6 +70,7 @@
IIOManager ioManager = serviceCtx.getIoManager();
FileReference file = ioManager.resolve(path);
List<IVirtualBufferCache> vbcs = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
+ ioOpCallbackFactory.initialize(serviceCtx);
//TODO: enable updateAwareness for secondary LSMBTree indexes
boolean updateAware = false;
return LSMBTreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx), typeTraits,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index a8e707f..d759167 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -24,11 +24,10 @@
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
-public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent implements ILSMDiskComponent {
+public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent {
protected final BTree btree;
public LSMBTreeDiskComponent(AbstractLSMIndex lsmIndex, BTree btree, ILSMComponentFilter filter) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
index a60f544..ab8e899 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
@@ -127,4 +127,11 @@
* @return index data structure that is the stored in the component
*/
IIndex getIndex();
+
+ /**
+ *
+ * @return id of the component
+ * @throws HyracksDataException
+ */
+ ILSMComponentId getId() throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
new file mode 100644
index 0000000..5662862
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+/**
+ * Stores the id of the disk component, which is a interval (minId, maxId).
+ * It is generated by {@link ILSMComponentIdGenerator}
+ *
+ */
+public interface ILSMComponentId {
+ public enum IdCompareResult {
+ UNKNOWN,
+ LESS_THAN,
+ GREATER_THAN,
+ INTERSECT,
+ INCLUDE
+ }
+
+ /**
+ * @return whether the id is missing
+ */
+ boolean missing();
+
+ IdCompareResult compareTo(ILSMComponentId id);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
new file mode 100644
index 0000000..5dd3061
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+/**
+ * This interface generates component Ids for LSM components (both memory and disk components).
+ */
+public interface ILSMComponentIdGenerator {
+
+ /**
+ * @return An Id for LSM component
+ */
+ public ILSMComponentId getId();
+
+ /**
+ * Refresh the component Id generator to generate the next Id.
+ * {@link #getId()} would always return the same Id before this method is called.
+ */
+ public void refresh();
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java
new file mode 100644
index 0000000..c0f530b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+
+@FunctionalInterface
+public interface ILSMComponentIdGeneratorFactory extends Serializable {
+
+ ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
index 43c5482..bd2bb45 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
@@ -48,14 +48,6 @@
int getFileReferenceCount();
/**
- * Return the component Id of this disk component from its metadata
- *
- * @return
- * @throws HyracksDataException
- */
- ILSMDiskComponentId getComponentId() throws HyracksDataException;
-
- /**
* @return LsmIndex of the component
*/
AbstractLSMIndex getLsmIndex();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java
deleted file mode 100644
index 5d38ace..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.storage.am.lsm.common.api;
-
-import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
-
-/**
- * Stores the id of the disk component, which is a interval (minId, maxId).
- * When a disk component is formed by the flush operation, its initial minId and maxId are the same, and
- * currently are set as the flush LSN.
- * When a disk component is formed by the merge operation, its [minId, maxId] is set as the union of
- * all ids of merged disk components.
- *
- * @author luochen
- *
- */
-public interface ILSMDiskComponentId {
-
- public static final long NOT_FOUND = -1;
-
- public static final MutableArrayValueReference COMPONENT_ID_MIN_KEY =
- new MutableArrayValueReference("Component_Id_Min".getBytes());
-
- public static final MutableArrayValueReference COMPONENT_ID_MAX_KEY =
- new MutableArrayValueReference("Component_Id_Max".getBytes());
-
- long getMinId();
-
- long getMaxId();
-
- default boolean notFound() {
- return getMinId() == NOT_FOUND || getMaxId() == NOT_FOUND;
- }
-
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
index b291f7c..a9dc50e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
@@ -20,7 +20,15 @@
import java.io.Serializable;
-@FunctionalInterface
+import org.apache.hyracks.api.application.INCServiceContext;
+
public interface ILSMIOOperationCallbackFactory extends Serializable {
+ /**
+ * Initialize the callback factory with the given ncCtx
+ *
+ * @param ncCtx
+ */
+ void initialize(INCServiceContext ncCtx);
+
ILSMIOOperationCallback createIoOpCallback(ILSMIndex index);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
index 13543e4..f892585 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
@@ -101,4 +101,12 @@
* @return the size of the memory component
*/
long getSize();
+
+ /**
+ * Reset the component Id of the memory component after it's recycled
+ *
+ * @param newId
+ * @throws HyracksDataException
+ */
+ void resetId(ILSMComponentId newId) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index a0d1c23..b664102 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -18,19 +18,29 @@
*/
package org.apache.hyracks.storage.am.lsm.common.impls;
+import java.util.logging.Logger;
+
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
import org.apache.hyracks.storage.common.MultiComparator;
public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent implements ILSMDiskComponent {
+ private static final Logger LOGGER = Logger.getLogger(AbstractLSMDiskComponent.class.getName());
+
private final DiskComponentMetadata metadata;
+ // a variable cache of componentId stored in metadata.
+ // since componentId is immutable, we do not want to read from metadata every time the componentId
+ // is requested.
+ private ILSMComponentId componentId;
+
public AbstractLSMDiskComponent(AbstractLSMIndex lsmIndex, IMetadataPageManager mdPageManager,
ILSMComponentFilter filter) {
super(lsmIndex, filter);
@@ -109,13 +119,23 @@
}
@Override
- public ILSMDiskComponentId getComponentId() throws HyracksDataException {
- long minID = ComponentUtils.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MIN_KEY,
- ILSMDiskComponentId.NOT_FOUND);
- long maxID = ComponentUtils.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MAX_KEY,
- ILSMDiskComponentId.NOT_FOUND);
- //TODO: do we need to throw an exception when ID is not found?
- return new LSMDiskComponentId(minID, maxID);
+ public ILSMComponentId getId() throws HyracksDataException {
+ if (componentId != null) {
+ return componentId;
+ }
+ synchronized (this) {
+ if (componentId == null) {
+ componentId = LSMComponentIdUtils.readFrom(metadata);
+ }
+ }
+ if (componentId.missing()) {
+ // For normal datasets, componentId shouldn't be missing, since otherwise it'll be a bug.
+ // However, we cannot throw an exception here to be compatible with legacy datasets.
+ // In this case, the disk component would always get a garbage Id [-1, -1], which makes the
+ // component Id-based optimization useless but still correct.
+ LOGGER.warning("Component Id not found from disk component metadata");
+ }
+ return componentId;
}
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 2b2fe0d..b0cc318 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -44,6 +44,8 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId.IdCompareResult;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
@@ -438,6 +440,7 @@
if (c != EmptyComponent.INSTANCE) {
diskComponents.add(0, c);
}
+ assert checkComponentIds();
}
@Override
@@ -448,6 +451,25 @@
if (newComponent != EmptyComponent.INSTANCE) {
diskComponents.add(swapIndex, newComponent);
}
+ assert checkComponentIds();
+ }
+
+ /**
+ * A helper method to ensure disk components have proper Ids (non-decreasing)
+ * We may get rid of this method once component Id is stablized
+ *
+ * @throws HyracksDataException
+ */
+ private boolean checkComponentIds() throws HyracksDataException {
+ for (int i = 0; i < diskComponents.size() - 1; i++) {
+ ILSMComponentId id1 = diskComponents.get(i).getId();
+ ILSMComponentId id2 = diskComponents.get(i + 1).getId();
+ IdCompareResult cmp = id1.compareTo(id2);
+ if (cmp != IdCompareResult.UNKNOWN && cmp != IdCompareResult.GREATER_THAN) {
+ return false;
+ }
+ }
+ return true;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index b7c3350..0378aae 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -22,9 +22,12 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId.IdCompareResult;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent implements ILSMMemoryComponent {
@@ -34,6 +37,7 @@
private int writerCount;
private boolean requestedToBeActive;
private final MemoryComponentMetadata metadata;
+ private ILSMComponentId componentId;
public AbstractLSMMemoryComponent(AbstractLSMIndex lsmIndex, IVirtualBufferCache vbc, boolean isActive,
ILSMComponentFilter filter) {
@@ -247,6 +251,7 @@
protected void doDeallocate() throws HyracksDataException {
getIndex().deactivate();
getIndex().destroy();
+ componentId = null;
}
@Override
@@ -259,4 +264,19 @@
IBufferCache virtualBufferCache = getIndex().getBufferCache();
return virtualBufferCache.getPageBudget() * (long) virtualBufferCache.getPageSize();
}
+
+ @Override
+ public ILSMComponentId getId() {
+ return componentId;
+ }
+
+ @Override
+ public void resetId(ILSMComponentId componentId) throws HyracksDataException {
+ if (this.componentId != null && this.componentId.compareTo(componentId) != IdCompareResult.LESS_THAN) {
+ throw new IllegalStateException(
+ "LSM memory component receives illegal id. Old id " + this.componentId + ", new id " + componentId);
+ }
+ this.componentId = componentId;
+ LSMComponentIdUtils.persist(this.componentId, metadata);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
index f2751bf..e3ca9f1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
@@ -25,8 +25,8 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.common.IIndex;
@@ -83,8 +83,8 @@
}
@Override
- public ILSMDiskComponentId getComponentId() throws HyracksDataException {
- return null;
+ public ILSMComponentId getId() {
+ return LSMComponentId.MISSING_COMPONENT_ID;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
new file mode 100644
index 0000000..dd86f65
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+
+public class LSMComponentId implements ILSMComponentId {
+
+ public static final long NOT_FOUND = -1;
+
+ // Use to handle legacy datasets which do not have the component Id
+ public static final ILSMComponentId MISSING_COMPONENT_ID = new LSMComponentId(NOT_FOUND, NOT_FOUND);
+
+ // A default component id used for bulk loaded component
+ public static final ILSMComponentId DEFAULT_COMPONENT_ID = new LSMComponentId(0, 0);
+
+ private long minId;
+
+ private long maxId;
+
+ public LSMComponentId(long minId, long maxId) {
+ assert minId <= maxId;
+ this.minId = minId;
+ this.maxId = maxId;
+ }
+
+ public void reset(long minId, long maxId) {
+ this.minId = minId;
+ this.maxId = maxId;
+ }
+
+ public long getMinId() {
+ return this.minId;
+ }
+
+ public long getMaxId() {
+ return this.maxId;
+ }
+
+ @Override
+ public boolean missing() {
+ return minId == NOT_FOUND || maxId == NOT_FOUND;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + minId + "," + maxId + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * Long.hashCode(minId) + Long.hashCode(maxId);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof LSMComponentId)) {
+ return false;
+ }
+ LSMComponentId other = (LSMComponentId) obj;
+ if (maxId != other.maxId) {
+ return false;
+ }
+ if (minId != other.minId) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public IdCompareResult compareTo(ILSMComponentId id) {
+ if (this.missing() || id == null || id.missing()) {
+ return IdCompareResult.UNKNOWN;
+ }
+ LSMComponentId componentId = (LSMComponentId) id;
+ if (this.getMinId() > componentId.getMaxId()) {
+ return IdCompareResult.GREATER_THAN;
+ } else if (this.getMaxId() < componentId.getMinId()) {
+ return IdCompareResult.LESS_THAN;
+ } else if (this.getMinId() <= componentId.getMinId() && this.getMaxId() >= componentId.getMaxId()) {
+ return IdCompareResult.INCLUDE;
+ } else {
+ return IdCompareResult.INTERSECT;
+ }
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
new file mode 100644
index 0000000..e174153
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+
+/**
+ * A default implementation of {@link ILSMComponentIdGenerator}.
+ *
+ */
+public class LSMComponentIdGenerator implements ILSMComponentIdGenerator {
+
+ protected long previousTimestamp = -1L;
+
+ private ILSMComponentId componentId;
+
+ public LSMComponentIdGenerator() {
+ refresh();
+ }
+
+ @Override
+ public void refresh() {
+ long ts = getCurrentTimestamp();
+ componentId = new LSMComponentId(ts, ts);
+ }
+
+ @Override
+ public ILSMComponentId getId() {
+ return componentId;
+ }
+
+ protected long getCurrentTimestamp() {
+ long timestamp = System.currentTimeMillis();
+ while (timestamp <= previousTimestamp) {
+ // make sure timestamp is strictly increasing
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ timestamp = System.currentTimeMillis();
+ }
+ previousTimestamp = timestamp;
+ return timestamp;
+
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java
new file mode 100644
index 0000000..c55ef19
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
+
+/**
+ * A default implementation of {@link ILSMComponentIdGeneratorFactory}.
+ *
+ */
+public class LSMComponentIdGeneratorFactory implements ILSMComponentIdGeneratorFactory {
+
+ @Override
+ public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) {
+ return new LSMComponentIdGenerator();
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java
deleted file mode 100644
index f448c84..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.storage.am.lsm.common.impls;
-
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
-
-public class LSMDiskComponentId implements ILSMDiskComponentId {
-
- private final long minId;
-
- private final long maxId;
-
- public LSMDiskComponentId(long minId, long maxId) {
- this.minId = minId;
- this.maxId = maxId;
- }
-
- @Override
- public long getMinId() {
- return this.minId;
- }
-
- @Override
- public long getMaxId() {
- return this.maxId;
- }
-
- @Override
- public String toString() {
- return "[" + minId + "," + maxId + "]";
- }
-
- @Override
- public int hashCode() {
- return 31 * Long.hashCode(minId) + Long.hashCode(maxId);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!(obj instanceof LSMDiskComponentId)) {
- return false;
- }
- LSMDiskComponentId other = (LSMDiskComponentId) obj;
- if (maxId != other.maxId) {
- return false;
- }
- if (minId != other.minId) {
- return false;
- }
- return true;
- }
-
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 48b6d8f..b0abeb1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -95,15 +95,24 @@
// Before entering the components, prune those corner cases that indeed should not proceed.
switch (opType) {
case FLUSH:
+ // if the lsm index does not have memory components allocated, then nothing to flush
+ if (!lsmIndex.isMemoryComponentsAllocated()) {
+ return false;
+ }
ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0);
if (!flushingComponent.isModified()) {
- //The mutable component has not been modified by any writer. There is nothing to flush.
- //since the component is empty, set its state back to READABLE_WRITABLE
if (flushingComponent.getState() == ComponentState.READABLE_UNWRITABLE) {
+ //The mutable component has not been modified by any writer. There is nothing to flush.
+ //since the component is empty, set its state back to READABLE_WRITABLE only when it's
+ //state has been set to READABLE_UNWRITABLE
flushingComponent.setState(ComponentState.READABLE_WRITABLE);
opTracker.notifyAll();
+
+ // Call recycled only when we change it's state is reset back to READABLE_WRITABLE
+ // Otherwise, if the component is in other state, e.g., INACTIVE, or
+ // READABLE_UNWRITABLE_FLUSHING, it's not considered as being recycled here.
+ lsmIndex.getIOOperationCallback().recycled(flushingComponent);
}
- lsmIndex.getIOOperationCallback().recycled(flushingComponent);
return false;
}
if (flushingComponent.getWriterCount() > 0) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
index 08b8bb6..000d5cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
@@ -21,13 +21,14 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader {
private final AbstractLSMIndex lsmIndex;
private final ILSMDiskComponent component;
- private final IIndexBulkLoader componentBulkLoader;
+ private final ILSMDiskComponentBulkLoader componentBulkLoader;
public LSMIndexDiskComponentBulkLoader(AbstractLSMIndex lsmIndex, float fillFactor, boolean verifyInput,
long numElementsHint) throws HyracksDataException {
@@ -39,11 +40,19 @@
component.createBulkLoader(fillFactor, verifyInput, numElementsHint, false, true, true);
}
+ public ILSMDiskComponent getComponent() {
+ return component;
+ }
+
@Override
public void add(ITupleReference tuple) throws HyracksDataException {
componentBulkLoader.add(tuple);
}
+ public void delete(ITupleReference tuple) throws HyracksDataException {
+ componentBulkLoader.delete(tuple);
+ }
+
@Override
public void end() throws HyracksDataException {
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
index 09ca553..21d10d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
@@ -20,6 +20,7 @@
import java.util.List;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
@@ -37,6 +38,11 @@
return NoOpIOOperationCallback.INSTANCE;
}
+ @Override
+ public void initialize(INCServiceContext ncCtx) {
+ // No op
+ }
+
public static class NoOpIOOperationCallback implements ILSMIOOperationCallback {
private static final NoOpIOOperationCallback INSTANCE = new NoOpIOOperationCallback();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
new file mode 100644
index 0000000..3c88543
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.util;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+
+public class LSMComponentIdUtils {
+
+ private static final MutableArrayValueReference COMPONENT_ID_MIN_KEY =
+ new MutableArrayValueReference("Component_Id_Min".getBytes());
+
+ private static final MutableArrayValueReference COMPONENT_ID_MAX_KEY =
+ new MutableArrayValueReference("Component_Id_Max".getBytes());
+
+ private LSMComponentIdUtils() {
+
+ }
+
+ public static ILSMComponentId readFrom(IComponentMetadata metadata) throws HyracksDataException {
+ long minId = ComponentUtils.getLong(metadata, COMPONENT_ID_MIN_KEY, LSMComponentId.NOT_FOUND);
+ long maxId = ComponentUtils.getLong(metadata, COMPONENT_ID_MAX_KEY, LSMComponentId.NOT_FOUND);
+ if (minId == LSMComponentId.NOT_FOUND || maxId == LSMComponentId.NOT_FOUND) {
+ return LSMComponentId.MISSING_COMPONENT_ID;
+ } else {
+ return new LSMComponentId(minId, maxId);
+ }
+ }
+
+ public static void persist(ILSMComponentId id, IComponentMetadata metadata) throws HyracksDataException {
+ LSMComponentId componentId = (LSMComponentId) id;
+ metadata.put(COMPONENT_ID_MIN_KEY, LongPointable.FACTORY.createPointable(componentId.getMinId()));
+ metadata.put(COMPONENT_ID_MAX_KEY, LongPointable.FACTORY.createPointable(componentId.getMaxId()));
+ }
+
+ public static ILSMComponentId union(ILSMComponentId id1, ILSMComponentId id2) {
+ long minId = Long.min(((LSMComponentId) id1).getMinId(), ((LSMComponentId) id2).getMinId());
+ long maxId = Long.max(((LSMComponentId) id1).getMaxId(), ((LSMComponentId) id2).getMaxId());
+ return new LSMComponentId(minId, maxId);
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java
index cdb55bf..a45f006 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java
@@ -87,6 +87,7 @@
IBufferCache bufferCache = storageManager.getBufferCache(serviceCtx);
ILSMMergePolicy mergePolicy = mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx);
ILSMIOOperationScheduler ioScheduler = ioSchedulerProvider.getIoScheduler(serviceCtx);
+ ioOpCallbackFactory.initialize(serviceCtx);
if (isPartitioned) {
return InvertedIndexUtils.createPartitionedLSMInvertedIndex(ioManager, virtualBufferCaches, typeTraits,
cmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory, bufferCache,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java
index f0ea5b8..6918b19 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java
@@ -66,6 +66,7 @@
public IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException {
IIOManager ioManager = ncServiceCtx.getIoManager();
FileReference fileRef = ioManager.resolve(path);
+ ioOpCallbackFactory.initialize(ncServiceCtx);
return LSMRTreeUtils.createExternalRTree(ioManager, fileRef, storageManager.getBufferCache(ncServiceCtx),
typeTraits, cmpFactories, btreeCmpFactories, valueProviderFactories, rtreePolicyType,
bloomFilterFalsePositiveRate, mergePolicyFactory.createMergePolicy(mergePolicyProperties, ncServiceCtx),
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java
index 1fa5081..f6396cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java
@@ -83,6 +83,7 @@
IIOManager ioManager = ncServiceCtx.getIoManager();
FileReference fileRef = ioManager.resolve(path);
List<IVirtualBufferCache> virtualBufferCaches = vbcProvider.getVirtualBufferCaches(ncServiceCtx, fileRef);
+ ioOpCallbackFactory.initialize(ncServiceCtx);
return LSMRTreeUtils.createLSMTree(ioManager, virtualBufferCaches, fileRef,
storageManager.getBufferCache(ncServiceCtx), typeTraits, cmpFactories, btreeCmpFactories,
valueProviderFactories, rtreePolicyType, bloomFilterFalsePositiveRate,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java
index aea205d..e807c72 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java
@@ -78,6 +78,7 @@
IIOManager ioManager = serviceCtx.getIoManager();
FileReference file = ioManager.resolve(path);
List<IVirtualBufferCache> virtualBufferCaches = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
+ ioOpCallbackFactory.initialize(serviceCtx);
return LSMRTreeUtils.createLSMTreeWithAntiMatterTuples(ioManager, virtualBufferCaches, file,
storageManager.getBufferCache(serviceCtx), typeTraits, cmpFactories, btreeComparatorFactories,
valueProviderFactories, rtreePolicyType,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 1a5121b..a20eeb7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -236,7 +236,7 @@
}
ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(opCtx, returnDeletedTuples);
ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory);
- return new MergeOperation(accessor, mergeFileRefs.getInsertIndexFileReference(), callback,
- fileManager.getBaseDir().getAbsolutePath(), cursor);
+ return new LSMRTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), null, null,
+ callback, fileManager.getBaseDir().getAbsolutePath());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
index d8dda71..47a8046 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
@@ -60,6 +60,7 @@
IIOManager ioManager = serviceCtx.getIoManager();
FileReference file = ioManager.resolve(path);
List<IVirtualBufferCache> vbcs = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
+ ioOpCallbackFactory.initialize(serviceCtx);
return TestLsmBtreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx),
typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),