[NO ISSUE][STO] Ensure First Component ID is Initialized
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Initialize the component id generator from the primary
index checkpoint, if exits, as soon as it is created.
- Ensure the first component id is passed to all indexes.
Change-Id: I246f9373f950e2f9a2c63f86746462e42a3f1c62
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2948
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index adf9960..946815f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -300,7 +300,6 @@
TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false);
ILogRecord logRecord = null;
- ILSMComponentIdGenerator idGenerator = null;
try {
logReader.setPosition(lowWaterMarkLSN);
logRecord = logReader.next();
@@ -389,8 +388,7 @@
int partition = logRecord.getResourcePartition();
if (partitions.contains(partition)) {
int datasetId = logRecord.getDatasetId();
- idGenerator = datasetLifecycleManager.getComponentIdGenerator(datasetId, partition);
- if (idGenerator == null) {
+ if (!datasetLifecycleManager.isRegistered(datasetId)) {
// it's possible this dataset has been dropped
logRecord = logReader.next();
continue;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 017c59f..5aa1b36 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -56,6 +56,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.test.support.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -75,6 +76,7 @@
private static ITransactionContext txnCtx;
private static LSMInsertDeleteOperatorNodePushable insertOp;
private static final int PARTITION = 0;
+ private static String indexPath;
@BeforeClass
public static void setUp() throws Exception {
@@ -109,6 +111,7 @@
txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
+ indexPath = indexDataflowHelper.getResource().getPath();
}
@After
@@ -148,9 +151,9 @@
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
ILSMComponentId next =
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -163,8 +166,8 @@
// rollback the last disk component
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
- next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
+ next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -216,9 +219,9 @@
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
ILSMComponentId next =
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -248,8 +251,8 @@
// rollback the last disk component
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
- next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
+ next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -302,9 +305,9 @@
firstSearcher.waitUntilEntered();
// now that we enetered, we will rollback
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
ILSMComponentId next =
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -329,8 +332,8 @@
secondSearcher.waitUntilEntered();
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
- next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
+ next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -738,9 +741,10 @@
public void run() {
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
try {
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
- ILSMComponentId next =
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath)
+ .refresh();
+ ILSMComponentId next = dsLifecycleMgr
+ .getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
index f4c0ea6..c727f52 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
@@ -127,7 +127,7 @@
public TestLsmIoOpCallback(DatasetInfo dsInfo, ILSMIndex index, ILSMComponentIdGenerator idGenerator,
IIndexCheckpointManagerProvider checkpointManagerProvider) {
- super(dsInfo, index, idGenerator, checkpointManagerProvider);
+ super(dsInfo, index, idGenerator.getId(), checkpointManagerProvider);
lsmBtree = (TestLsmBtree) index;
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
index 22ee542..c4390fa 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -55,12 +55,13 @@
DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) appCtx.getDatasetLifecycleManager();
DatasetResource dsr = dslcManager.getDatasetLifecycle(datasetId);
int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
- PrimaryIndexOperationTracker opTracker = dslcManager.getOperationTracker(datasetId, partition);
+ PrimaryIndexOperationTracker opTracker =
+ dslcManager.getOperationTracker(datasetId, partition, resource.getPath());
if (!(opTracker instanceof TestPrimaryIndexOperationTracker)) {
Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
opTracker = new TestPrimaryIndexOperationTracker(datasetId, partition,
appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(),
- dslcManager.getComponentIdGenerator(datasetId, partition));
+ dslcManager.getComponentIdGenerator(datasetId, partition, resource.getPath()));
replaceMapEntry(opTrackersField, dsr, partition, opTracker);
}
return opTracker;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 4441c6e..d18b6ab 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -40,6 +40,15 @@
IIndex getIndex(int datasetId, long indexId) throws HyracksDataException;
/**
+ * Indicates if the dataset with id {@code datasetId} is currently registered
+ * with this {@link IDatasetLifecycleManager}
+ *
+ * @param datasetId
+ * @return true if the dataset is currently registered. Otherwise false.
+ */
+ boolean isRegistered(int datasetId);
+
+ /**
* Flushes all open datasets synchronously.
*
* @throws HyracksDataException
@@ -76,18 +85,20 @@
*
* @param datasetId
* @param partition
+ * @param path
* @return
*/
- PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition);
+ PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String path);
/**
* creates (if necessary) and returns the component Id generator of a dataset.
*
* @param datasetId
* @param partition
+ * @param path
* @return
*/
- ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition);
+ ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path);
/**
* creates (if necessary) and returns the dataset virtual buffer caches.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
index 6476e68..811da78 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
@@ -53,7 +53,7 @@
IDatasetLifecycleManager dslcManager =
((INcApplicationContext) serviceCtx.getApplicationContext()).getDatasetLifecycleManager();
int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
- return dslcManager.getComponentIdGenerator(datasetId, partition);
+ return dslcManager.getComponentIdGenerator(datasetId, partition, resource.getPath());
}
@Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 682eaea..1dff69d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -38,7 +38,9 @@
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogType;
@@ -113,9 +115,6 @@
datasetResource = getDatasetLifecycle(did);
}
datasetResource.register(resource, (ILSMIndex) index);
- if (((ILSMIndex) index).isPrimaryIndex()) {
- initializeDatasetPartitionValidComponentId(datasetResource, resource);
- }
}
private int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
@@ -326,32 +325,36 @@
}
@Override
- public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition) {
+ public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String path) {
DatasetResource dataset = datasets.get(datasetId);
PrimaryIndexOperationTracker opTracker = dataset.getOpTracker(partition);
if (opTracker == null) {
- populateOpTrackerAndIdGenerator(dataset, partition);
+ populateOpTrackerAndIdGenerator(dataset, partition, path);
opTracker = dataset.getOpTracker(partition);
}
return opTracker;
}
@Override
- public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition) {
+ public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path) {
DatasetResource dataset = datasets.get(datasetId);
- if (dataset == null) {
- return null;
- }
ILSMComponentIdGenerator generator = dataset.getComponentIdGenerator(partition);
if (generator == null) {
- populateOpTrackerAndIdGenerator(dataset, partition);
+ populateOpTrackerAndIdGenerator(dataset, partition, path);
generator = dataset.getComponentIdGenerator(partition);
}
return generator;
}
- private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int partition) {
- ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(storageProperties.getMemoryComponentsNum());
+ @Override
+ public synchronized boolean isRegistered(int datasetId) {
+ return datasets.containsKey(datasetId);
+ }
+
+ private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int partition, String path) {
+ final long lastValidId = getDatasetLastValidComponentId(path);
+ ILSMComponentIdGenerator idGenerator =
+ new LSMComponentIdGenerator(storageProperties.getMemoryComponentsNum(), lastValidId);
PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(dataset.getDatasetID(), partition,
logManager, dataset.getDatasetInfo(), idGenerator);
dataset.setPrimaryIndexOperationTracker(partition, opTracker);
@@ -605,21 +608,15 @@
}
}
- private void initializeDatasetPartitionValidComponentId(DatasetResource datasetResource,
- LocalResource primaryIndexResource) {
- final IndexInfo indexInfo = datasetResource.getIndexInfo(primaryIndexResource.getId());
- final int partition = indexInfo.getPartition();
- final ILSMComponentIdGenerator componentIdGenerator =
- getComponentIdGenerator(datasetResource.getDatasetID(), partition);
- final long indexLastValidComponentId = getIndexLastValidComponentId(indexInfo.getLocalResource());
- componentIdGenerator.init(indexLastValidComponentId);
- }
-
- private long getIndexLastValidComponentId(LocalResource resource) {
+ private long getDatasetLastValidComponentId(String indexPath) {
try {
- final DatasetResourceReference datasetResource = DatasetResourceReference.of(resource);
- return Math.max(indexCheckpointManagerProvider.get(datasetResource).getLatest().getLastComponentId(),
- MIN_VALID_COMPONENT_ID);
+ final ResourceReference indexRef = ResourceReference.ofIndex(indexPath);
+ final ResourceReference primaryIndexRef = indexRef.getDatasetReference();
+ final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(primaryIndexRef);
+ if (indexCheckpointManager.getCheckpointCount() > 0) {
+ return Math.max(indexCheckpointManager.getLatest().getLastComponentId(), MIN_VALID_COMPONENT_ID);
+ }
+ return MIN_VALID_COMPONENT_ID;
} catch (HyracksDataException e) {
throw new IllegalStateException(e);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 606d63c..53bc31b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -37,7 +37,6 @@
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
@@ -69,19 +68,18 @@
private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
protected final DatasetInfo dsInfo;
protected final ILSMIndex lsmIndex;
- private final ILSMComponentIdGenerator componentIdGenerator;
private long firstLsnForCurrentMemoryComponent = 0L;
private long persistenceLsn = 0L;
private int pendingFlushes = 0;
private Deque<ILSMComponentId> componentIds = new ArrayDeque<>();
private boolean firstAllocation = true;
- public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentIdGenerator componentIdGenerator,
+ public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentId componentId,
IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
this.dsInfo = dsInfo;
this.lsmIndex = lsmIndex;
- this.componentIdGenerator = componentIdGenerator;
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
+ componentIds.add(componentId);
}
@Override
@@ -278,9 +276,6 @@
@Override
public void allocated(ILSMMemoryComponent component) throws HyracksDataException {
- if (firstAllocation) {
- firstAllocation = false;
- componentIds.add(componentIdGenerator.getId());
- }
+ // no op
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
index 68ffd6a..25cd8b2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
@@ -70,8 +70,8 @@
@Override
public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException {
- return new LSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index, getComponentIdGenerator(),
- getIndexCheckpointManagerProvider());
+ return new LSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index,
+ getComponentIdGenerator().getId(), getIndexCheckpointManagerProvider());
}
@Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index 8bd848d..a24bf72 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -38,6 +38,10 @@
protected ResourceReference() {
}
+ public static ResourceReference ofIndex(String indexPath) {
+ return of(new File(indexPath, StorageConstants.METADATA_FILE_NAME).toString());
+ }
+
public static ResourceReference of(String localResourcePath) {
ResourceReference lrr = new ResourceReference();
parse(lrr, localResourcePath);
@@ -72,6 +76,11 @@
return Paths.get(root, partition, dataverse, dataset, rebalance, index);
}
+ public ResourceReference getDatasetReference() {
+ return ResourceReference
+ .ofIndex(Paths.get(root, partition, dataverse, dataset, rebalance, dataset).toFile().getPath());
+ }
+
public Path getFileRelativePath() {
return Paths.get(root, partition, dataverse, dataset, rebalance, index, name);
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
index 4b18e1d..db0911b 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
@@ -78,10 +78,9 @@
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
DatasetInfo dsInfo = new DatasetInfo(101, null);
- LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents);
- idGenerator.init(MIN_VALID_COMPONENT_ID);
- LSMIOOperationCallback callback =
- new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
+ LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
+ LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
+ mockIndexCheckpointManagerProvider());
//Flush first
idGenerator.refresh();
long flushLsn = 1L;
@@ -138,19 +137,19 @@
public void testAllocateComponentId() throws HyracksDataException {
int numMemoryComponents = 2;
DatasetInfo dsInfo = new DatasetInfo(101, null);
- ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents);
- idGenerator.init(MIN_VALID_COMPONENT_ID);
+ ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
- LSMIOOperationCallback callback =
- new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
- callback.allocated(mockComponent);
+ LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
+ mockIndexCheckpointManagerProvider());
ILSMComponentId initialId = idGenerator.getId();
+ // simulate a partition is flushed before allocated
idGenerator.refresh();
long flushLsn = 1L;
ILSMComponentId nextComponentId = idGenerator.getId();
+ callback.allocated(mockComponent);
callback.recycled(mockComponent);
checkMemoryComponent(initialId, mockComponent);
}
@@ -159,16 +158,14 @@
public void testRecycleComponentId() throws HyracksDataException {
int numMemoryComponents = 2;
DatasetInfo dsInfo = new DatasetInfo(101, null);
- ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents);
- idGenerator.init(MIN_VALID_COMPONENT_ID);
+ ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
- LSMIOOperationCallback callback =
- new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
+ LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
+ mockIndexCheckpointManagerProvider());
String indexId = "mockIndexId";
- callback.allocated(mockComponent);
ILSMComponentId id = idGenerator.getId();
callback.recycled(mockComponent);
checkMemoryComponent(id, mockComponent);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
index e353714..ab84211 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
@@ -49,7 +49,7 @@
IDatasetLifecycleManager dslcManager =
((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
- return dslcManager.getOperationTracker(datasetId, partition);
+ return dslcManager.getOperationTracker(datasetId, partition, resource.getPath());
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
index 0af06d7..a5c6360 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
@@ -38,11 +38,4 @@
* @return the index of the current memory component
*/
int getCurrentComponentIndex();
-
- /**
- * Initializes this {@link ILSMComponentIdGenerator} by setting the last used id
- *
- * @param lastUsedId
- */
- void init(long lastUsedId);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
index e6bf0ab..21a27a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
@@ -33,25 +33,16 @@
private int currentComponentIndex;
private long lastUsedId;
private ILSMComponentId componentId;
- private boolean initialized = false;
- public LSMComponentIdGenerator(int numComponents) {
+ public LSMComponentIdGenerator(int numComponents, long lastUsedId) {
this.numComponents = numComponents;
- }
-
- @Override
- public synchronized void init(long lastUsedId) {
this.lastUsedId = lastUsedId;
- initialized = true;
refresh();
currentComponentIndex = 0;
}
@Override
public synchronized void refresh() {
- if (!initialized) {
- throw new IllegalStateException("Attempt to refresh component id before initialziation.");
- }
final long nextId = ++lastUsedId;
componentId = new LSMComponentId(nextId, nextId);
currentComponentIndex = (currentComponentIndex + 1) % numComponents;
@@ -59,9 +50,6 @@
@Override
public synchronized ILSMComponentId getId() {
- if (!initialized) {
- throw new IllegalStateException("Attempt to get component id before initialziation.");
- }
return componentId;
}