[NO ISSUE][STO] Ensure First Component ID is Initialized

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Initialize the component id generator from the primary
  index checkpoint, if exits, as soon as it is created.
- Ensure the first component id is passed to all indexes.

Change-Id: I246f9373f950e2f9a2c63f86746462e42a3f1c62
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2948
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index adf9960..946815f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -300,7 +300,6 @@
         TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false);
 
         ILogRecord logRecord = null;
-        ILSMComponentIdGenerator idGenerator = null;
         try {
             logReader.setPosition(lowWaterMarkLSN);
             logRecord = logReader.next();
@@ -389,8 +388,7 @@
                         int partition = logRecord.getResourcePartition();
                         if (partitions.contains(partition)) {
                             int datasetId = logRecord.getDatasetId();
-                            idGenerator = datasetLifecycleManager.getComponentIdGenerator(datasetId, partition);
-                            if (idGenerator == null) {
+                            if (!datasetLifecycleManager.isRegistered(datasetId)) {
                                 // it's possible this dataset has been dropped
                                 logRecord = logReader.next();
                                 continue;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 017c59f..5aa1b36 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -56,6 +56,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.test.support.TestUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -75,6 +76,7 @@
     private static ITransactionContext txnCtx;
     private static LSMInsertDeleteOperatorNodePushable insertOp;
     private static final int PARTITION = 0;
+    private static String indexPath;
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -109,6 +111,7 @@
         txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                 new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
         insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
+        indexPath = indexDataflowHelper.getResource().getPath();
     }
 
     @After
@@ -148,9 +151,9 @@
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
             ILSMComponentId next =
-                    dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+                    dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
             long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
             Map<String, Object> flushMap = new HashMap<>();
             flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -163,8 +166,8 @@
             // rollback the last disk component
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
-            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
-            next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
+            next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
             flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
             flushMap = new HashMap<>();
             flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -216,9 +219,9 @@
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
             ILSMComponentId next =
-                    dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+                    dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
             long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
             Map<String, Object> flushMap = new HashMap<>();
             flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -248,8 +251,8 @@
             // rollback the last disk component
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
-            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
-            next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
+            next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
             flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
             flushMap = new HashMap<>();
             flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -302,9 +305,9 @@
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
             ILSMComponentId next =
-                    dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+                    dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
             long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
             Map<String, Object> flushMap = new HashMap<>();
             flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -329,8 +332,8 @@
             secondSearcher.waitUntilEntered();
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
-            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
-            next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
+            next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
             flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
             flushMap = new HashMap<>();
             flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -738,9 +741,10 @@
                 public void run() {
                     ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
                     try {
-                        dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
-                        ILSMComponentId next =
-                                dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+                        dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath)
+                                .refresh();
+                        ILSMComponentId next = dsLifecycleMgr
+                                .getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
                         long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
                         Map<String, Object> flushMap = new HashMap<>();
                         flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
index f4c0ea6..c727f52 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
@@ -127,7 +127,7 @@
 
         public TestLsmIoOpCallback(DatasetInfo dsInfo, ILSMIndex index, ILSMComponentIdGenerator idGenerator,
                 IIndexCheckpointManagerProvider checkpointManagerProvider) {
-            super(dsInfo, index, idGenerator, checkpointManagerProvider);
+            super(dsInfo, index, idGenerator.getId(), checkpointManagerProvider);
             lsmBtree = (TestLsmBtree) index;
         }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
index 22ee542..c4390fa 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -55,12 +55,13 @@
             DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) appCtx.getDatasetLifecycleManager();
             DatasetResource dsr = dslcManager.getDatasetLifecycle(datasetId);
             int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
-            PrimaryIndexOperationTracker opTracker = dslcManager.getOperationTracker(datasetId, partition);
+            PrimaryIndexOperationTracker opTracker =
+                    dslcManager.getOperationTracker(datasetId, partition, resource.getPath());
             if (!(opTracker instanceof TestPrimaryIndexOperationTracker)) {
                 Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
                 opTracker = new TestPrimaryIndexOperationTracker(datasetId, partition,
                         appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(),
-                        dslcManager.getComponentIdGenerator(datasetId, partition));
+                        dslcManager.getComponentIdGenerator(datasetId, partition, resource.getPath()));
                 replaceMapEntry(opTrackersField, dsr, partition, opTracker);
             }
             return opTracker;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 4441c6e..d18b6ab 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -40,6 +40,15 @@
     IIndex getIndex(int datasetId, long indexId) throws HyracksDataException;
 
     /**
+     * Indicates if the dataset with id {@code datasetId} is currently registered
+     * with this {@link IDatasetLifecycleManager}
+     *
+     * @param datasetId
+     * @return true if the dataset is currently registered. Otherwise false.
+     */
+    boolean isRegistered(int datasetId);
+
+    /**
      * Flushes all open datasets synchronously.
      *
      * @throws HyracksDataException
@@ -76,18 +85,20 @@
      *
      * @param datasetId
      * @param partition
+     * @param path
      * @return
      */
-    PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition);
+    PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String path);
 
     /**
      * creates (if necessary) and returns the component Id generator of a dataset.
      *
      * @param datasetId
      * @param partition
+     * @param path
      * @return
      */
-    ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition);
+    ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path);
 
     /**
      * creates (if necessary) and returns the dataset virtual buffer caches.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
index 6476e68..811da78 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
@@ -53,7 +53,7 @@
         IDatasetLifecycleManager dslcManager =
                 ((INcApplicationContext) serviceCtx.getApplicationContext()).getDatasetLifecycleManager();
         int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
-        return dslcManager.getComponentIdGenerator(datasetId, partition);
+        return dslcManager.getComponentIdGenerator(datasetId, partition, resource.getPath());
     }
 
     @Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 682eaea..1dff69d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -38,7 +38,9 @@
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
@@ -113,9 +115,6 @@
             datasetResource = getDatasetLifecycle(did);
         }
         datasetResource.register(resource, (ILSMIndex) index);
-        if (((ILSMIndex) index).isPrimaryIndex()) {
-            initializeDatasetPartitionValidComponentId(datasetResource, resource);
-        }
     }
 
     private int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
@@ -326,32 +325,36 @@
     }
 
     @Override
-    public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition) {
+    public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String path) {
         DatasetResource dataset = datasets.get(datasetId);
         PrimaryIndexOperationTracker opTracker = dataset.getOpTracker(partition);
         if (opTracker == null) {
-            populateOpTrackerAndIdGenerator(dataset, partition);
+            populateOpTrackerAndIdGenerator(dataset, partition, path);
             opTracker = dataset.getOpTracker(partition);
         }
         return opTracker;
     }
 
     @Override
-    public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition) {
+    public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path) {
         DatasetResource dataset = datasets.get(datasetId);
-        if (dataset == null) {
-            return null;
-        }
         ILSMComponentIdGenerator generator = dataset.getComponentIdGenerator(partition);
         if (generator == null) {
-            populateOpTrackerAndIdGenerator(dataset, partition);
+            populateOpTrackerAndIdGenerator(dataset, partition, path);
             generator = dataset.getComponentIdGenerator(partition);
         }
         return generator;
     }
 
-    private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int partition) {
-        ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(storageProperties.getMemoryComponentsNum());
+    @Override
+    public synchronized boolean isRegistered(int datasetId) {
+        return datasets.containsKey(datasetId);
+    }
+
+    private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int partition, String path) {
+        final long lastValidId = getDatasetLastValidComponentId(path);
+        ILSMComponentIdGenerator idGenerator =
+                new LSMComponentIdGenerator(storageProperties.getMemoryComponentsNum(), lastValidId);
         PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(dataset.getDatasetID(), partition,
                 logManager, dataset.getDatasetInfo(), idGenerator);
         dataset.setPrimaryIndexOperationTracker(partition, opTracker);
@@ -605,21 +608,15 @@
         }
     }
 
-    private void initializeDatasetPartitionValidComponentId(DatasetResource datasetResource,
-            LocalResource primaryIndexResource) {
-        final IndexInfo indexInfo = datasetResource.getIndexInfo(primaryIndexResource.getId());
-        final int partition = indexInfo.getPartition();
-        final ILSMComponentIdGenerator componentIdGenerator =
-                getComponentIdGenerator(datasetResource.getDatasetID(), partition);
-        final long indexLastValidComponentId = getIndexLastValidComponentId(indexInfo.getLocalResource());
-        componentIdGenerator.init(indexLastValidComponentId);
-    }
-
-    private long getIndexLastValidComponentId(LocalResource resource) {
+    private long getDatasetLastValidComponentId(String indexPath) {
         try {
-            final DatasetResourceReference datasetResource = DatasetResourceReference.of(resource);
-            return Math.max(indexCheckpointManagerProvider.get(datasetResource).getLatest().getLastComponentId(),
-                    MIN_VALID_COMPONENT_ID);
+            final ResourceReference indexRef = ResourceReference.ofIndex(indexPath);
+            final ResourceReference primaryIndexRef = indexRef.getDatasetReference();
+            final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(primaryIndexRef);
+            if (indexCheckpointManager.getCheckpointCount() > 0) {
+                return Math.max(indexCheckpointManager.getLatest().getLastComponentId(), MIN_VALID_COMPONENT_ID);
+            }
+            return MIN_VALID_COMPONENT_ID;
         } catch (HyracksDataException e) {
             throw new IllegalStateException(e);
         }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 606d63c..53bc31b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -37,7 +37,6 @@
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
@@ -69,19 +68,18 @@
     private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
     protected final DatasetInfo dsInfo;
     protected final ILSMIndex lsmIndex;
-    private final ILSMComponentIdGenerator componentIdGenerator;
     private long firstLsnForCurrentMemoryComponent = 0L;
     private long persistenceLsn = 0L;
     private int pendingFlushes = 0;
     private Deque<ILSMComponentId> componentIds = new ArrayDeque<>();
     private boolean firstAllocation = true;
 
-    public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentIdGenerator componentIdGenerator,
+    public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentId componentId,
             IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
         this.dsInfo = dsInfo;
         this.lsmIndex = lsmIndex;
-        this.componentIdGenerator = componentIdGenerator;
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
+        componentIds.add(componentId);
     }
 
     @Override
@@ -278,9 +276,6 @@
 
     @Override
     public void allocated(ILSMMemoryComponent component) throws HyracksDataException {
-        if (firstAllocation) {
-            firstAllocation = false;
-            componentIds.add(componentIdGenerator.getId());
-        }
+        // no op
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
index 68ffd6a..25cd8b2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
@@ -70,8 +70,8 @@
 
     @Override
     public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException {
-        return new LSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index, getComponentIdGenerator(),
-                getIndexCheckpointManagerProvider());
+        return new LSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index,
+                getComponentIdGenerator().getId(), getIndexCheckpointManagerProvider());
     }
 
     @Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index 8bd848d..a24bf72 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -38,6 +38,10 @@
     protected ResourceReference() {
     }
 
+    public static ResourceReference ofIndex(String indexPath) {
+        return of(new File(indexPath, StorageConstants.METADATA_FILE_NAME).toString());
+    }
+
     public static ResourceReference of(String localResourcePath) {
         ResourceReference lrr = new ResourceReference();
         parse(lrr, localResourcePath);
@@ -72,6 +76,11 @@
         return Paths.get(root, partition, dataverse, dataset, rebalance, index);
     }
 
+    public ResourceReference getDatasetReference() {
+        return ResourceReference
+                .ofIndex(Paths.get(root, partition, dataverse, dataset, rebalance, dataset).toFile().getPath());
+    }
+
     public Path getFileRelativePath() {
         return Paths.get(root, partition, dataverse, dataset, rebalance, index, name);
     }
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
index 4b18e1d..db0911b 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
@@ -78,10 +78,9 @@
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
         Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
         DatasetInfo dsInfo = new DatasetInfo(101, null);
-        LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents);
-        idGenerator.init(MIN_VALID_COMPONENT_ID);
-        LSMIOOperationCallback callback =
-                new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
+        LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
+        LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
+                mockIndexCheckpointManagerProvider());
         //Flush first
         idGenerator.refresh();
         long flushLsn = 1L;
@@ -138,19 +137,19 @@
     public void testAllocateComponentId() throws HyracksDataException {
         int numMemoryComponents = 2;
         DatasetInfo dsInfo = new DatasetInfo(101, null);
-        ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents);
-        idGenerator.init(MIN_VALID_COMPONENT_ID);
+        ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
         ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
         Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
-        LSMIOOperationCallback callback =
-                new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
-        callback.allocated(mockComponent);
+        LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
+                mockIndexCheckpointManagerProvider());
         ILSMComponentId initialId = idGenerator.getId();
+        // simulate a partition is flushed before allocated
         idGenerator.refresh();
         long flushLsn = 1L;
         ILSMComponentId nextComponentId = idGenerator.getId();
+        callback.allocated(mockComponent);
         callback.recycled(mockComponent);
         checkMemoryComponent(initialId, mockComponent);
     }
@@ -159,16 +158,14 @@
     public void testRecycleComponentId() throws HyracksDataException {
         int numMemoryComponents = 2;
         DatasetInfo dsInfo = new DatasetInfo(101, null);
-        ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents);
-        idGenerator.init(MIN_VALID_COMPONENT_ID);
+        ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
         ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
         Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
-        LSMIOOperationCallback callback =
-                new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
+        LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
+                mockIndexCheckpointManagerProvider());
         String indexId = "mockIndexId";
-        callback.allocated(mockComponent);
         ILSMComponentId id = idGenerator.getId();
         callback.recycled(mockComponent);
         checkMemoryComponent(id, mockComponent);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
index e353714..ab84211 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
@@ -49,7 +49,7 @@
         IDatasetLifecycleManager dslcManager =
                 ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
         int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
-        return dslcManager.getOperationTracker(datasetId, partition);
+        return dslcManager.getOperationTracker(datasetId, partition, resource.getPath());
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
index 0af06d7..a5c6360 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
@@ -38,11 +38,4 @@
      * @return the index of the current memory component
      */
     int getCurrentComponentIndex();
-
-    /**
-     * Initializes this {@link ILSMComponentIdGenerator} by setting the last used id
-     *
-     * @param lastUsedId
-     */
-    void init(long lastUsedId);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
index e6bf0ab..21a27a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
@@ -33,25 +33,16 @@
     private int currentComponentIndex;
     private long lastUsedId;
     private ILSMComponentId componentId;
-    private boolean initialized = false;
 
-    public LSMComponentIdGenerator(int numComponents) {
+    public LSMComponentIdGenerator(int numComponents, long lastUsedId) {
         this.numComponents = numComponents;
-    }
-
-    @Override
-    public synchronized void init(long lastUsedId) {
         this.lastUsedId = lastUsedId;
-        initialized = true;
         refresh();
         currentComponentIndex = 0;
     }
 
     @Override
     public synchronized void refresh() {
-        if (!initialized) {
-            throw new IllegalStateException("Attempt to refresh component id before initialziation.");
-        }
         final long nextId = ++lastUsedId;
         componentId = new LSMComponentId(nextId, nextId);
         currentComponentIndex = (currentComponentIndex + 1) % numComponents;
@@ -59,9 +50,6 @@
 
     @Override
     public synchronized ILSMComponentId getId() {
-        if (!initialized) {
-            throw new IllegalStateException("Attempt to get component id before initialziation.");
-        }
         return componentId;
     }