[ASTERIXDB-2115] Add Component Ids to LSM Indexes

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Add LSMComponentId to all LSM components. Component Ids are managed
through IO operation callbacks.
- For memory component, it's ID is reset every time it's recycled.
- For disk component, it's ID is copied from the source component(s)
during flush/merge
- For indexes of a dataset, we need to guarantee all their memory
components should recieve the same ID. This is achieved using a shared
component Id generator.
- Fix memory component recycled callback, make sure it's called only
when we've indeed recycled the memory component

A design wiki for this patch: https://cwiki.apache.org/confluence/display/
ASTERIXDB/Component+Id-based+secondary-to-primary+index+acceleration

Change-Id: I8aec6261a84a0729ce35f4b1cb708be299ddb98d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2025
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d6457f2..494eb65 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -993,8 +993,11 @@
             // #. create the index artifact in NC.
             runJob(hcc, spec, jobFlags);
 
-            // #. flush the internal dataset for correlated policy
-            if (ds.isCorrelated() && ds.getDatasetType() == DatasetType.INTERNAL) {
+            // #. flush the internal dataset
+            // We need this to guarantee the correctness of component Id acceleration for secondary-to-primary index.
+            // Otherwise, the new secondary index component would corresponding to a partial memory component
+            // of the primary index, which is incorrect.
+            if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 FlushDatasetUtil.flushDataset(hcc, metadataProvider, index.getDataverseName(), index.getDatasetName());
             }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 6bff50d..00b185d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -197,6 +197,7 @@
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
             // rollback a memory component
             lsmAccessor.deleteComponents(memoryComponentsPredicate);
             searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
@@ -204,6 +205,7 @@
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
             lsmAccessor.deleteComponents(pred);
             searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
         } catch (Throwable e) {
@@ -246,6 +248,7 @@
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
             // rollback a memory component
             lsmAccessor.deleteComponents(memoryComponentsPredicate);
             searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
@@ -270,6 +273,7 @@
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
             lsmAccessor.deleteComponents(pred);
             searchAndAssertCount(nc, ctx, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
         } catch (Throwable e) {
@@ -316,6 +320,7 @@
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
             // rollback a memory component
             lsmAccessor.deleteComponents(
                     c -> (c instanceof ILSMMemoryComponent && ((ILSMMemoryComponent) c).isModified()));
@@ -335,6 +340,7 @@
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
             DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
+            dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
             lsmAccessor.deleteComponents(pred);
             // now that the rollback has completed, we will unblock the search
             lsmBtree.addSearchCallback(sem -> sem.release());
@@ -731,7 +737,7 @@
     }
 
     private class Rollerback {
-        private Thread task;
+        private final Thread task;
         private Exception failure;
 
         public Rollerback(TestLsmBtree lsmBtree, Predicate<ILSMComponent> predicate) {
@@ -740,6 +746,7 @@
                 @Override
                 public void run() {
                     ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+                    dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
                     try {
                         lsmAccessor.deleteComponents(predicate);
                     } catch (HyracksDataException e) {
@@ -760,7 +767,7 @@
     }
 
     private class Searcher {
-        private ExecutorService executor = Executors.newSingleThreadExecutor();
+        private final ExecutorService executor = Executors.newSingleThreadExecutor();
         private Future<Boolean> task;
         private volatile boolean entered = false;
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
index 893b428..e0502de 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
@@ -21,6 +21,7 @@
 import java.util.Map;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory;
 import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -62,6 +63,6 @@
 
     @Override
     public ILSMIOOperationCallbackFactory getIoOperationCallbackFactory(Index index) throws AlgebricksException {
-        return TestLsmBtreeIoOpCallbackFactory.INSTANCE;
+        return new TestLsmBtreeIoOpCallbackFactory(new DatasetLSMComponentIdGeneratorFactory(getDatasetId()));
     }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
index 142bcc5..fa37c20 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
@@ -18,19 +18,20 @@
  */
 package org.apache.asterix.test.dataflow;
 
+import org.apache.asterix.common.ioopcallbacks.AbstractLSMIndexIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.EmptyComponent;
 
-public class TestLsmBtreeIoOpCallbackFactory implements ILSMIOOperationCallbackFactory {
+public class TestLsmBtreeIoOpCallbackFactory extends AbstractLSMIndexIOOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
 
-    public static TestLsmBtreeIoOpCallbackFactory INSTANCE = new TestLsmBtreeIoOpCallbackFactory();
     private static volatile int completedFlushes = 0;
     private static volatile int completedMerges = 0;
     private static volatile int rollbackFlushes = 0;
@@ -38,7 +39,8 @@
     private static volatile int failedFlushes = 0;
     private static volatile int failedMerges = 0;
 
-    private TestLsmBtreeIoOpCallbackFactory() {
+    public TestLsmBtreeIoOpCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) {
+        super(idGeneratorFactory);
     }
 
     @Override
@@ -50,7 +52,7 @@
         // Whenever this is called, it resets the counter
         // However, the counters for the failed operations are never reset since we expect them
         // To be always 0
-        return new TestLsmBtreeIoOpCallback(index);
+        return new TestLsmBtreeIoOpCallback(index, getComponentIdGenerator());
     }
 
     public int getTotalFlushes() {
@@ -90,14 +92,14 @@
     }
 
     public class TestLsmBtreeIoOpCallback extends LSMBTreeIOOperationCallback {
-        public TestLsmBtreeIoOpCallback(ILSMIndex index) {
-            super(index);
+        public TestLsmBtreeIoOpCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
+            super(index, idGenerator);
         }
 
         @Override
         public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) {
             super.afterFinalize(opType, newComponent);
-            synchronized (INSTANCE) {
+            synchronized (TestLsmBtreeIoOpCallbackFactory.this) {
                 if (newComponent != null) {
                     if (newComponent == EmptyComponent.INSTANCE) {
                         if (opType == LSMIOOperationType.FLUSH) {
@@ -115,7 +117,7 @@
                 } else {
                     recordFailure(opType);
                 }
-                INSTANCE.notifyAll();
+                TestLsmBtreeIoOpCallbackFactory.this.notifyAll();
             }
         }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index a32d4dc..41c5ade 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -25,6 +25,7 @@
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IResourceLifecycleManager;
@@ -79,6 +80,14 @@
     PrimaryIndexOperationTracker getOperationTracker(int datasetId);
 
     /**
+     * creates (if necessary) and returns the component Id generator of a dataset.
+     *
+     * @param datasetId
+     * @return
+     */
+    ILSMComponentIdGenerator getComponentIdGenerator(int datasetId);
+
+    /**
      * creates (if necessary) and returns the dataset virtual buffer caches.
      *
      * @param datasetId
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index b877cfb..e5fc998 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -29,11 +29,14 @@
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId.IdCompareResult;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
 
 public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy {
 
@@ -86,27 +89,26 @@
             //nothing to merge
             return false;
         }
-        long minID = immutableComponents.get(mergeableIndexes.getLeft()).getComponentId().getMinId();
-        long maxID = immutableComponents.get(mergeableIndexes.getRight()).getComponentId().getMaxId();
-
+        ILSMComponent leftComponent = immutableComponents.get(mergeableIndexes.getLeft());
+        ILSMComponent rightComponent = immutableComponents.get(mergeableIndexes.getRight());
+        ILSMComponentId targetId = LSMComponentIdUtils.union(leftComponent.getId(), rightComponent.getId());
         Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos();
         int partition = getIndexPartition(index, indexInfos);
-        triggerScheduledMerge(minID, maxID,
+        triggerScheduledMerge(targetId,
                 indexInfos.stream().filter(info -> info.getPartition() == partition).collect(Collectors.toSet()));
         return true;
     }
 
     /**
-     * Submit merge requests for all disk components within [minID, maxID]
+     * Submit merge requests for all disk components within the range specified by targetId
      * of all indexes of a given dataset in the given partition
      *
-     * @param minID
-     * @param maxID
-     * @param partition
+     * @param targetId
      * @param indexInfos
      * @throws HyracksDataException
      */
-    private void triggerScheduledMerge(long minID, long maxID, Set<IndexInfo> indexInfos) throws HyracksDataException {
+    private void triggerScheduledMerge(ILSMComponentId targetId, Set<IndexInfo> indexInfos)
+            throws HyracksDataException {
         for (IndexInfo info : indexInfos) {
             ILSMIndex lsmIndex = info.getIndex();
 
@@ -116,13 +118,13 @@
             }
             List<ILSMDiskComponent> mergableComponents = new ArrayList<>();
             for (ILSMDiskComponent component : immutableComponents) {
-                ILSMDiskComponentId id = component.getComponentId();
-                if (id.getMinId() >= minID && id.getMaxId() <= maxID) {
+                ILSMComponentId id = component.getId();
+                IdCompareResult cmp = targetId.compareTo(id);
+                if (cmp == IdCompareResult.INCLUDE) {
                     mergableComponents.add(component);
-                }
-                if (id.getMaxId() < minID) {
+                } else if (cmp == IdCompareResult.GREATER_THAN) {
                     //disk components are ordered from latest (with largest IDs) to oldest (with smallest IDs)
-                    //if the component.maxID < minID, we can safely skip the rest disk components in the list
+                    // if targetId>component.Id, we can safely skip the rest disk components in the list
                     break;
                 }
             }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
new file mode 100644
index 0000000..7b8397c
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.context;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
+
+/**
+ * This factory implementation is used by AsterixDB layer so that indexes of a dataset (/partition)
+ * use the same Id generator. This guarantees their memory components would receive the same Id upon
+ * activation.
+ *
+ */
+public class DatasetLSMComponentIdGeneratorFactory implements ILSMComponentIdGeneratorFactory {
+    private static final long serialVersionUID = 1L;
+
+    private final int datasetId;
+
+    public DatasetLSMComponentIdGeneratorFactory(int datasetId) {
+        this.datasetId = datasetId;
+    }
+
+    @Override
+    public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) {
+        IDatasetLifecycleManager dslcManager =
+                ((INcApplicationContext) serviceCtx.getApplicationContext()).getDatasetLifecycleManager();
+        return dslcManager.getComponentIdGenerator(datasetId);
+    }
+
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 6282509..1e99fb5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -43,11 +43,13 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.LocalResource;
@@ -233,10 +235,12 @@
             dsr = datasets.get(did);
             if (dsr == null) {
                 DatasetInfo dsInfo = new DatasetInfo(did);
-                PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(did, logManager, dsInfo);
+                ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator();
+                PrimaryIndexOperationTracker opTracker =
+                        new PrimaryIndexOperationTracker(did, logManager, dsInfo, idGenerator);
                 DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties,
                         memoryManager.getNumPages(did), numPartitions);
-                dsr = new DatasetResource(dsInfo, opTracker, vbcs);
+                dsr = new DatasetResource(dsInfo, opTracker, vbcs, idGenerator);
                 datasets.put(did, dsr);
             }
             return dsr;
@@ -319,6 +323,11 @@
         return datasets.get(datasetId).getOpTracker();
     }
 
+    @Override
+    public ILSMComponentIdGenerator getComponentIdGenerator(int datasetId) {
+        return datasets.get(datasetId).getIdGenerator();
+    }
+
     private void validateDatasetLifecycleManagerState() throws HyracksDataException {
         if (stopped) {
             throw new HyracksDataException(DatasetLifecycleManager.class.getSimpleName() + " was stopped.");
@@ -404,6 +413,9 @@
             }
         }
 
+        ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID());
+        idGenerator.refresh();
+
         if (asyncFlush) {
             for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
                 ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 79ae1da..f6e2b0d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -23,6 +23,7 @@
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.LocalResource;
 
@@ -42,12 +43,15 @@
     private final DatasetInfo datasetInfo;
     private final PrimaryIndexOperationTracker datasetPrimaryOpTracker;
     private final DatasetVirtualBufferCaches datasetVirtualBufferCaches;
+    private final ILSMComponentIdGenerator datasetComponentIdGenerator;
 
     public DatasetResource(DatasetInfo datasetInfo, PrimaryIndexOperationTracker datasetPrimaryOpTracker,
-            DatasetVirtualBufferCaches datasetVirtualBufferCaches) {
+            DatasetVirtualBufferCaches datasetVirtualBufferCaches,
+            ILSMComponentIdGenerator datasetComponentIdGenerator) {
         this.datasetInfo = datasetInfo;
         this.datasetPrimaryOpTracker = datasetPrimaryOpTracker;
         this.datasetVirtualBufferCaches = datasetVirtualBufferCaches;
+        this.datasetComponentIdGenerator = datasetComponentIdGenerator;
     }
 
     public boolean isRegistered() {
@@ -116,6 +120,10 @@
         return datasetPrimaryOpTracker;
     }
 
+    public ILSMComponentIdGenerator getIdGenerator() {
+        return datasetComponentIdGenerator;
+    }
+
     @Override
     public int compareTo(DatasetResource o) {
         return datasetInfo.compareTo(o.datasetInfo);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 01e33a7..6f35a3d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
@@ -45,13 +46,16 @@
     // Number of active operations on an ILSMIndex instance.
     private final AtomicInteger numActiveOperations;
     private final ILogManager logManager;
+    private final ILSMComponentIdGenerator idGenerator;
     private boolean flushOnExit = false;
     private boolean flushLogCreated = false;
 
-    public PrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo) {
+    public PrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo,
+            ILSMComponentIdGenerator idGenerator) {
         super(datasetID, dsInfo);
         this.logManager = logManager;
         this.numActiveOperations = new AtomicInteger();
+        this.idGenerator = idGenerator;
     }
 
     @Override
@@ -142,6 +146,7 @@
 
     //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled.
     public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
+        idGenerator.refresh();
         for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
             //get resource
             ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index 68f42e7..e445fe4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -20,27 +20,25 @@
 package org.apache.asterix.common.ioopcallbacks;
 
 import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
-import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentId;
 import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
 
 // A single LSMIOOperationCallback per LSM index used to perform actions around Flush and Merge operations
 public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback {
-    private static final Logger LOGGER = Logger.getLogger(AbstractLSMIOOperationCallback.class.getName());
     public static final MutableArrayValueReference LSN_KEY = new MutableArrayValueReference("LSN".getBytes());
     public static final long INVALID = -1L;
 
@@ -56,8 +54,11 @@
     // Index of the currently being written to component
     protected int writeIndex;
 
-    public AbstractLSMIOOperationCallback(ILSMIndex lsmIndex) {
+    protected final ILSMComponentIdGenerator idGenerator;
+
+    public AbstractLSMIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator) {
         this.lsmIndex = lsmIndex;
+        this.idGenerator = idGenerator;
         int count = lsmIndex.getNumberOfAllMemoryComponents();
         mutableLastLSNs = new long[count];
         firstLSNs = new long[count];
@@ -114,40 +115,22 @@
         return pointable.getLength() == 0 ? INVALID : pointable.longValue();
     }
 
-    private ILSMDiskComponentId getComponentId(List<ILSMComponent> oldComponents) throws HyracksDataException {
-        if (oldComponents == null) {
-            //if oldComponents == null, then getComponentLSN would treat it as a flush operation,
-            //and return the LSN for the flushed component
-            long id = getComponentLSN(null);
-            if (id == 0) {
-                LOGGER.log(Level.WARNING, "Flushing a memory component without setting the LSN");
-                id = ILSMDiskComponentId.NOT_FOUND;
-            }
-            return new LSMDiskComponentId(id, id);
-        } else {
-            long minId = Long.MAX_VALUE;
-            long maxId = Long.MIN_VALUE;
-            for (ILSMComponent oldComponent : oldComponents) {
-                ILSMDiskComponentId oldComponentId = ((ILSMDiskComponent) oldComponent).getComponentId();
-                if (oldComponentId.getMinId() < minId) {
-                    minId = oldComponentId.getMinId();
-                }
-                if (oldComponentId.getMaxId() > maxId) {
-                    maxId = oldComponentId.getMaxId();
-                }
-            }
-            return new LSMDiskComponentId(minId, maxId);
+    private ILSMComponentId getMergedComponentId(List<ILSMComponent> mergedComponents) throws HyracksDataException {
+        if (mergedComponents == null || mergedComponents.isEmpty()) {
+            return null;
         }
+        return LSMComponentIdUtils.union(mergedComponents.get(0).getId(),
+                mergedComponents.get(mergedComponents.size() - 1).getId());
+
     }
 
-    private void putComponentIdIntoMetadata(ILSMDiskComponent component, List<ILSMComponent> oldComponents)
-            throws HyracksDataException {
-        DiskComponentMetadata metadata = component.getMetadata();
-        ILSMDiskComponentId componentId = getComponentId(oldComponents);
-        metadata.put(ILSMDiskComponentId.COMPONENT_ID_MIN_KEY,
-                LongPointable.FACTORY.createPointable(componentId.getMinId()));
-        metadata.put(ILSMDiskComponentId.COMPONENT_ID_MAX_KEY,
-                LongPointable.FACTORY.createPointable(componentId.getMaxId()));
+    private void putComponentIdIntoMetadata(LSMIOOperationType opType, ILSMDiskComponent newComponent,
+            List<ILSMComponent> oldComponents) throws HyracksDataException {
+        // the id of flushed component is set when we copy the metadata of the memory component
+        if (opType == LSMIOOperationType.MERGE) {
+            ILSMComponentId componentId = getMergedComponentId(oldComponents);
+            LSMComponentIdUtils.persist(componentId, newComponent.getMetadata());
+        }
     }
 
     public synchronized void updateLastLSN(long lastLSN) {
@@ -188,7 +171,7 @@
         //TODO: Copying Filters and all content of the metadata pages for flush operation should be done here
         if (newComponent != null) {
             putLSNIntoMetadata(newComponent, oldComponents);
-            putComponentIdIntoMetadata(newComponent, oldComponents);
+            putComponentIdIntoMetadata(opType, newComponent, oldComponents);
             if (opType == LSMIOOperationType.MERGE) {
                 // In case of merge, oldComponents are never null
                 LongPointable markerLsn =
@@ -196,7 +179,6 @@
                                 ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND));
                 newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn);
             }
-
         }
     }
 
@@ -220,12 +202,12 @@
 
     @Override
     public void recycled(ILSMMemoryComponent component) throws HyracksDataException {
-        // No op
+        component.resetId(idGenerator.getId());
     }
 
     @Override
     public void allocated(ILSMMemoryComponent component) throws HyracksDataException {
-        // No op
+        component.resetId(idGenerator.getId());
     }
 
     /**
@@ -237,4 +219,5 @@
      */
     public abstract long getComponentFileLSNOffset(ILSMDiskComponent component, String componentFilePath)
             throws HyracksDataException;
+
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
new file mode 100644
index 0000000..16447fd
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.ioopcallbacks;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+
+public abstract class AbstractLSMIndexIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    protected final ILSMComponentIdGeneratorFactory idGeneratorFactory;
+
+    protected transient INCServiceContext ncCtx;
+
+    public AbstractLSMIndexIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) {
+        this.idGeneratorFactory = idGeneratorFactory;
+    }
+
+    @Override
+    public void initialize(INCServiceContext ncCtx) {
+        this.ncCtx = ncCtx;
+    }
+
+    protected ILSMComponentIdGenerator getComponentIdGenerator() {
+        assert ncCtx != null;
+        return idGeneratorFactory.getComponentIdGenerator(ncCtx);
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index c7fbb65..c1ee03b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -23,13 +23,14 @@
 import org.apache.hyracks.storage.am.btree.impls.BTree;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
 public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMBTreeIOOperationCallback(ILSMIndex index) {
-        super(index);
+    public LSMBTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
+        super(index, idGenerator);
     }
 
     @Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index e3abb6b..4ef12ef 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -19,21 +19,20 @@
 
 package org.apache.asterix.common.ioopcallbacks;
 
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
-public class LSMBTreeIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
+public class LSMBTreeIOOperationCallbackFactory extends AbstractLSMIndexIOOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
 
-    public static LSMBTreeIOOperationCallbackFactory INSTANCE = new LSMBTreeIOOperationCallbackFactory();
-
-    private LSMBTreeIOOperationCallbackFactory() {
+    public LSMBTreeIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) {
+        super(idGeneratorFactory);
     }
 
     @Override
     public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
-        return new LSMBTreeIOOperationCallback(index);
+        return new LSMBTreeIOOperationCallback(index, getComponentIdGenerator());
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
index 67d623a..b43fb2f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
@@ -22,13 +22,14 @@
 import org.apache.hyracks.storage.am.btree.impls.BTree;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyFileManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
 public class LSMBTreeWithBuddyIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMBTreeWithBuddyIOOperationCallback(ILSMIndex lsmIndex) {
-        super(lsmIndex);
+    public LSMBTreeWithBuddyIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator) {
+        super(lsmIndex, idGenerator);
     }
 
     @Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
index 93f505c..6727bf6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
@@ -18,22 +18,20 @@
  */
 package org.apache.asterix.common.ioopcallbacks;
 
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
-public class LSMBTreeWithBuddyIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
+public class LSMBTreeWithBuddyIOOperationCallbackFactory extends AbstractLSMIndexIOOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
 
-    public static final LSMBTreeWithBuddyIOOperationCallbackFactory INSTANCE =
-            new LSMBTreeWithBuddyIOOperationCallbackFactory();
-
-    private LSMBTreeWithBuddyIOOperationCallbackFactory() {
+    public LSMBTreeWithBuddyIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) {
+        super(idGeneratorFactory);
     }
 
     @Override
     public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
-        return new LSMBTreeWithBuddyIOOperationCallback(index);
+        return new LSMBTreeWithBuddyIOOperationCallback(index, getComponentIdGenerator());
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 2d27b78..015cd38 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -21,6 +21,7 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
@@ -28,8 +29,8 @@
 
 public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMInvertedIndexIOOperationCallback(ILSMIndex index) {
-        super(index);
+    public LSMInvertedIndexIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
+        super(index, idGenerator);
     }
 
     @Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index 47a67b2..a2712d1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -19,22 +19,20 @@
 
 package org.apache.asterix.common.ioopcallbacks;
 
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
-public class LSMInvertedIndexIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
+public class LSMInvertedIndexIOOperationCallbackFactory extends AbstractLSMIndexIOOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
 
-    public static final LSMInvertedIndexIOOperationCallbackFactory INSTANCE =
-            new LSMInvertedIndexIOOperationCallbackFactory();
-
-    private LSMInvertedIndexIOOperationCallbackFactory() {
+    public LSMInvertedIndexIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) {
+        super(idGeneratorFactory);
     }
 
     @Override
     public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
-        return new LSMInvertedIndexIOOperationCallback(index);
+        return new LSMInvertedIndexIOOperationCallback(index, getComponentIdGenerator());
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index 9ba99f9..bc79074 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -21,6 +21,7 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeFileManager;
@@ -28,8 +29,8 @@
 
 public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMRTreeIOOperationCallback(ILSMIndex index) {
-        super(index);
+    public LSMRTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
+        super(index, idGenerator);
     }
 
     @Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 14cf648..087aaae 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -19,21 +19,20 @@
 
 package org.apache.asterix.common.ioopcallbacks;
 
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
-public class LSMRTreeIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
+public class LSMRTreeIOOperationCallbackFactory extends AbstractLSMIndexIOOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
 
-    public static final LSMRTreeIOOperationCallbackFactory INSTANCE = new LSMRTreeIOOperationCallbackFactory();
-
-    private LSMRTreeIOOperationCallbackFactory() {
+    public LSMRTreeIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) {
+        super(idGeneratorFactory);
     }
 
     @Override
     public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
-        return new LSMRTreeIOOperationCallback(index);
+        return new LSMRTreeIOOperationCallback(index, getComponentIdGenerator());
     }
 }
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
index 01f08db..2928d90 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -33,13 +33,13 @@
 import org.apache.asterix.common.context.IndexInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentId;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.junit.Assert;
 import org.junit.Test;
@@ -62,14 +62,13 @@
     @Test
     public void testBasic() {
         try {
-            List<ILSMDiskComponentId> componentIDs =
-                    Arrays.asList(new LSMDiskComponentId(5, 5), new LSMDiskComponentId(4, 4),
-                            new LSMDiskComponentId(3, 3), new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1));
+            List<ILSMComponentId> componentIDs = Arrays.asList(new LSMComponentId(5, 5), new LSMComponentId(4, 4),
+                    new LSMComponentId(3, 3), new LSMComponentId(2, 2), new LSMComponentId(1, 1));
 
-            List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+            List<ILSMComponentId> resultPrimaryIDs = new ArrayList<>();
             IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0);
 
-            List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+            List<ILSMComponentId> resultSecondaryIDs = new ArrayList<>();
             IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0);
 
             ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
@@ -79,10 +78,10 @@
 
             policy.diskComponentAdded(primary.getIndex(), false);
 
-            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(4, 4), new LSMDiskComponentId(3, 3),
-                    new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1)), resultPrimaryIDs);
-            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(4, 4), new LSMDiskComponentId(3, 3),
-                    new LSMDiskComponentId(2, 2), new LSMDiskComponentId(1, 1)), resultSecondaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMComponentId(4, 4), new LSMComponentId(3, 3),
+                    new LSMComponentId(2, 2), new LSMComponentId(1, 1)), resultPrimaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMComponentId(4, 4), new LSMComponentId(3, 3),
+                    new LSMComponentId(2, 2), new LSMComponentId(1, 1)), resultSecondaryIDs);
 
         } catch (HyracksDataException e) {
             Assert.fail(e.getMessage());
@@ -93,14 +92,13 @@
     @Test
     public void testIDIntervals() {
         try {
-            List<ILSMDiskComponentId> componentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
-                    new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24),
-                    new LSMDiskComponentId(10, 19));
+            List<ILSMComponentId> componentIDs = Arrays.asList(new LSMComponentId(40, 50), new LSMComponentId(30, 35),
+                    new LSMComponentId(25, 29), new LSMComponentId(20, 24), new LSMComponentId(10, 19));
 
-            List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+            List<ILSMComponentId> resultPrimaryIDs = new ArrayList<>();
             IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0);
 
-            List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+            List<ILSMComponentId> resultSecondaryIDs = new ArrayList<>();
             IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0);
 
             ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
@@ -110,10 +108,10 @@
 
             policy.diskComponentAdded(primary.getIndex(), false);
 
-            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
-                    new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs);
-            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
-                    new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultSecondaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29),
+                    new LSMComponentId(20, 24), new LSMComponentId(10, 19)), resultPrimaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29),
+                    new LSMComponentId(20, 24), new LSMComponentId(10, 19)), resultSecondaryIDs);
 
         } catch (HyracksDataException e) {
             Assert.fail(e.getMessage());
@@ -123,15 +121,15 @@
     @Test
     public void testSecondaryMissing() {
         try {
-            List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
-                    new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24),
-                    new LSMDiskComponentId(10, 19));
-            List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+            List<ILSMComponentId> primaryComponentIDs =
+                    Arrays.asList(new LSMComponentId(40, 50), new LSMComponentId(30, 35), new LSMComponentId(25, 29),
+                            new LSMComponentId(20, 24), new LSMComponentId(10, 19));
+            List<ILSMComponentId> resultPrimaryIDs = new ArrayList<>();
             IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0);
 
-            List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35),
-                    new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24));
-            List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+            List<ILSMComponentId> secondaryComponentIDs =
+                    Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29), new LSMComponentId(20, 24));
+            List<ILSMComponentId> resultSecondaryIDs = new ArrayList<>();
             IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0);
 
             ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
@@ -141,10 +139,11 @@
             Assert.assertTrue(resultSecondaryIDs.isEmpty());
 
             policy.diskComponentAdded(primary.getIndex(), false);
-            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
-                    new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs);
-            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
-                    new LSMDiskComponentId(20, 24)), resultSecondaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29),
+                    new LSMComponentId(20, 24), new LSMComponentId(10, 19)), resultPrimaryIDs);
+            Assert.assertEquals(
+                    Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29), new LSMComponentId(20, 24)),
+                    resultSecondaryIDs);
 
         } catch (HyracksDataException e) {
             Assert.fail(e.getMessage());
@@ -154,17 +153,16 @@
     @Test
     public void testMultiPartition() {
         try {
-            List<ILSMDiskComponentId> componentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
-                    new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24),
-                    new LSMDiskComponentId(10, 19));
+            List<ILSMComponentId> componentIDs = Arrays.asList(new LSMComponentId(40, 50), new LSMComponentId(30, 35),
+                    new LSMComponentId(25, 29), new LSMComponentId(20, 24), new LSMComponentId(10, 19));
 
-            List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
+            List<ILSMComponentId> resultPrimaryIDs = new ArrayList<>();
             IndexInfo primary = mockIndex(true, componentIDs, resultPrimaryIDs, 0);
 
-            List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
+            List<ILSMComponentId> resultSecondaryIDs = new ArrayList<>();
             IndexInfo secondary = mockIndex(false, componentIDs, resultSecondaryIDs, 0);
 
-            List<ILSMDiskComponentId> resultSecondaryIDs1 = new ArrayList<>();
+            List<ILSMComponentId> resultSecondaryIDs1 = new ArrayList<>();
             IndexInfo secondary1 = mockIndex(false, componentIDs, resultSecondaryIDs, 1);
 
             ILSMMergePolicy policy = mockMergePolicy(primary, secondary, secondary1);
@@ -174,10 +172,10 @@
 
             policy.diskComponentAdded(primary.getIndex(), false);
 
-            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
-                    new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs);
-            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
-                    new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultSecondaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29),
+                    new LSMComponentId(20, 24), new LSMComponentId(10, 19)), resultPrimaryIDs);
+            Assert.assertEquals(Arrays.asList(new LSMComponentId(30, 35), new LSMComponentId(25, 29),
+                    new LSMComponentId(20, 24), new LSMComponentId(10, 19)), resultSecondaryIDs);
             Assert.assertTrue(resultSecondaryIDs1.isEmpty());
         } catch (HyracksDataException e) {
             Assert.fail(e.getMessage());
@@ -205,12 +203,12 @@
         return policy;
     }
 
-    private IndexInfo mockIndex(boolean isPrimary, List<ILSMDiskComponentId> componentIDs,
-            List<ILSMDiskComponentId> resultComponentIDs, int partition) throws HyracksDataException {
+    private IndexInfo mockIndex(boolean isPrimary, List<ILSMComponentId> componentIDs,
+            List<ILSMComponentId> resultComponentIDs, int partition) throws HyracksDataException {
         List<ILSMDiskComponent> components = new ArrayList<>();
-        for (ILSMDiskComponentId id : componentIDs) {
+        for (ILSMComponentId id : componentIDs) {
             ILSMDiskComponent component = Mockito.mock(ILSMDiskComponent.class);
-            Mockito.when(component.getComponentId()).thenReturn(id);
+            Mockito.when(component.getId()).thenReturn(id);
             Mockito.when(component.getComponentSize()).thenReturn(DEFAULT_COMPONENT_SIZE);
             Mockito.when(component.getState()).thenReturn(ComponentState.READABLE_UNWRITABLE);
             components.add(component);
@@ -227,7 +225,7 @@
                 List<ILSMDiskComponent> mergedComponents = invocation.getArgumentAt(1, List.class);
                 mergedComponents.forEach(component -> {
                     try {
-                        resultComponentIDs.add(component.getComponentId());
+                        resultComponentIDs.add(component.getId());
                     } catch (HyracksDataException e) {
                         e.printStackTrace();
                     }
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
index d48227f..f467ee8 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.junit.Assert;
 import org.mockito.Mockito;
 
@@ -34,7 +35,8 @@
         try {
             ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
             Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-            LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex);
+            LSMBTreeIOOperationCallback callback =
+                    new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
 
             //request to flush first component
             callback.updateLastLSN(1);
@@ -58,7 +60,8 @@
         try {
             ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
             Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-            LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex);
+            LSMBTreeIOOperationCallback callback =
+                    new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
 
             //request to flush first component
             callback.updateLastLSN(1);
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
index 94ef0a3..63c46f7 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.junit.Assert;
 import org.mockito.Mockito;
 
@@ -34,7 +35,8 @@
         try {
             ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
             Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-            LSMBTreeWithBuddyIOOperationCallback callback = new LSMBTreeWithBuddyIOOperationCallback(mockIndex);
+            LSMBTreeWithBuddyIOOperationCallback callback =
+                    new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
 
             //request to flush first component
             callback.updateLastLSN(1);
@@ -58,7 +60,8 @@
         try {
             ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
             Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-            LSMBTreeWithBuddyIOOperationCallback callback = new LSMBTreeWithBuddyIOOperationCallback(mockIndex);
+            LSMBTreeWithBuddyIOOperationCallback callback =
+                    new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
 
             //request to flush first component
             callback.updateLastLSN(1);
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
index b213da0..1e961d8 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.junit.Assert;
 import org.mockito.Mockito;
 
@@ -34,7 +35,8 @@
         try {
             ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
             Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-            LSMInvertedIndexIOOperationCallback callback = new LSMInvertedIndexIOOperationCallback(mockIndex);
+            LSMInvertedIndexIOOperationCallback callback =
+                    new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
 
             //request to flush first component
             callback.updateLastLSN(1);
@@ -58,7 +60,8 @@
         try {
             ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
             Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-            LSMInvertedIndexIOOperationCallback callback = new LSMInvertedIndexIOOperationCallback(mockIndex);
+            LSMInvertedIndexIOOperationCallback callback =
+                    new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
 
             //request to flush first component
             callback.updateLastLSN(1);
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
index df26ef9..618f2a3 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.junit.Assert;
 import org.mockito.Mockito;
 
@@ -34,7 +35,8 @@
         try {
             ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
             Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-            LSMRTreeIOOperationCallback callback = new LSMRTreeIOOperationCallback(mockIndex);
+            LSMRTreeIOOperationCallback callback =
+                    new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
 
             //request to flush first component
             callback.updateLastLSN(1);
@@ -58,7 +60,8 @@
         try {
             ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
             Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-            LSMRTreeIOOperationCallback callback = new LSMRTreeIOOperationCallback(mockIndex);
+            LSMRTreeIOOperationCallback callback =
+                    new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
 
             //request to flush first component
             callback.updateLastLSN(1);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index a6f1ad0..d2622c4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -33,6 +33,7 @@
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory;
 import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -79,6 +80,7 @@
 import org.apache.hyracks.storage.am.common.build.IndexBuilder;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
 import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeLocalResourceFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
@@ -324,7 +326,10 @@
         ILSMOperationTrackerFactory opTrackerFactory =
                 index.isPrimaryIndex() ? new PrimaryIndexOperationTrackerFactory(datasetId)
                         : new SecondaryIndexOperationTrackerFactory(datasetId);
-        ILSMIOOperationCallbackFactory ioOpCallbackFactory = LSMBTreeIOOperationCallbackFactory.INSTANCE;
+        ILSMComponentIdGeneratorFactory idGeneratorProvider =
+                new DatasetLSMComponentIdGeneratorFactory(index.getDatasetId().getId());
+        ILSMIOOperationCallbackFactory ioOpCallbackFactory =
+                new LSMBTreeIOOperationCallbackFactory(idGeneratorProvider);
         IStorageComponentProvider storageComponentProvider = appContext.getStorageComponentProvider();
         if (isNewUniverse()) {
             LSMBTreeLocalResourceFactory lsmBtreeFactory = new LSMBTreeLocalResourceFactory(
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 2f94ad74..fb9901d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -87,6 +87,8 @@
 import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
 import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -133,7 +135,6 @@
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
 import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
@@ -595,9 +596,9 @@
             // bulkload?)
             IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
                     storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
-            TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad =
-                    new TreeIndexBulkLoadOperatorDescriptor(spec, null, fieldPermutation,
-                            GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true, indexHelperFactory);
+            LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new LSMIndexBulkLoadOperatorDescriptor(spec, null,
+                    fieldPermutation, GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true,
+                    indexHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId());
             return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
@@ -1001,8 +1002,9 @@
             IOperatorDescriptor op;
             if (bulkload) {
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh);
+                op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh, null,
+                        BulkLoadUsage.LOAD, dataset.getDatasetId());
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
                         null, true, modificationCallbackFactory);
@@ -1135,8 +1137,9 @@
             IOperatorDescriptor op;
             if (bulkload) {
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
+                op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, null,
+                        BulkLoadUsage.LOAD, dataset.getDatasetId());
             } else if (indexOp == IndexOperation.UPSERT) {
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh,
                         filterFactory, modificationCallbackFactory, prevFieldPermutation);
@@ -1237,9 +1240,9 @@
             IOperatorDescriptor op;
             if (bulkload) {
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
+                op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
                         GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false,
-                        indexDataflowHelperFactory);
+                        indexDataflowHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId());
             } else if (indexOp == IndexOperation.UPSERT) {
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation,
                         indexDataflowHelperFactory, filterFactory, modificationCallbackFactory, prevFieldPermutation);
@@ -1353,8 +1356,9 @@
             IOperatorDescriptor op;
             if (bulkload) {
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory);
+                op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory,
+                        null, BulkLoadUsage.LOAD, dataset.getDatasetId());
             } else if (indexOp == IndexOperation.UPSERT) {
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory,
                         filterFactory, modificationCallbackFactory, prevFieldPermutation);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 3fec73b..e5f97f0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -30,6 +30,7 @@
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.active.IActiveNotificationHandler;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory;
 import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
@@ -107,6 +108,7 @@
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
@@ -505,15 +507,15 @@
             case BTREE:
                 return getDatasetType() == DatasetType.EXTERNAL
                         && !index.getIndexName().equals(IndexingConstants.getFilesIndexName(getDatasetName()))
-                                ? LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE
-                                : LSMBTreeIOOperationCallbackFactory.INSTANCE;
+                                ? new LSMBTreeWithBuddyIOOperationCallbackFactory(getComponentIdGeneratorFactory())
+                                : new LSMBTreeIOOperationCallbackFactory(getComponentIdGeneratorFactory());
             case RTREE:
-                return LSMRTreeIOOperationCallbackFactory.INSTANCE;
+                return new LSMRTreeIOOperationCallbackFactory(getComponentIdGeneratorFactory());
             case LENGTH_PARTITIONED_NGRAM_INVIX:
             case LENGTH_PARTITIONED_WORD_INVIX:
             case SINGLE_PARTITION_NGRAM_INVIX:
             case SINGLE_PARTITION_WORD_INVIX:
-                return LSMInvertedIndexIOOperationCallbackFactory.INSTANCE;
+                return new LSMInvertedIndexIOOperationCallbackFactory(getComponentIdGeneratorFactory());
             default:
                 throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE,
                         index.getIndexType().toString());
@@ -532,6 +534,10 @@
                 : new SecondaryIndexOperationTrackerFactory(getDatasetId());
     }
 
+    public ILSMComponentIdGeneratorFactory getComponentIdGeneratorFactory() {
+        return new DatasetLSMComponentIdGeneratorFactory(getDatasetId());
+    }
+
     /**
      * Get search callback factory for this dataset with the passed index and operation
      *
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 5dac407..7701f65 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -19,8 +19,6 @@
 
 package org.apache.asterix.metadata.utils;
 
-import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
-
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -51,6 +49,8 @@
 import org.apache.asterix.runtime.evaluators.functions.AndDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.NotDescriptor;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -71,6 +71,8 @@
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
 import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
@@ -124,8 +126,8 @@
         this.index = index;
         this.physOptConf = physOptConf;
         this.metadataProvider = metadataProvider;
-        this.itemType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(),
-                dataset.getItemTypeName());
+        this.itemType =
+                (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
         this.metaType = DatasetUtil.getMetaType(metadataProvider, dataset);
         Pair<ARecordType, ARecordType> enforcedTypes = getEnforcedType(index, itemType, metaType);
         this.enforcedItemType = enforcedTypes.first;
@@ -341,11 +343,15 @@
         return sortOp;
     }
 
-    protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
+    protected LSMIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
             int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
             throws AlgebricksException {
-        TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory);
+        IndexDataflowHelperFactory primaryIndexDataflowHelperFactory = new IndexDataflowHelperFactory(
+                metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider);
+
+        LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new LSMIndexBulkLoadOperatorDescriptor(spec,
+                secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory,
+                primaryIndexDataflowHelperFactory, BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId());
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
                 secondaryPartitionConstraint);
         return treeIndexBulkLoadOp;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
new file mode 100644
index 0000000..74590c7
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorDescriptor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.operators;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+
+public class LSMIndexBulkLoadOperatorDescriptor extends TreeIndexBulkLoadOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public enum BulkLoadUsage {
+        LOAD,
+        CREATE_INDEX
+    }
+
+    protected final IIndexDataflowHelperFactory primaryIndexHelperFactory;
+
+    protected final BulkLoadUsage usage;
+
+    protected final int datasetId;
+
+    public LSMIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
+            int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, IIndexDataflowHelperFactory indexHelperFactory,
+            IIndexDataflowHelperFactory primaryIndexHelperFactory, BulkLoadUsage usage, int datasetId) {
+        super(spec, outRecDesc, fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+                indexHelperFactory);
+        this.primaryIndexHelperFactory = primaryIndexHelperFactory;
+        this.usage = usage;
+        this.datasetId = datasetId;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return new LSMIndexBulkLoadOperatorNodePushable(indexHelperFactory, primaryIndexHelperFactory, ctx, partition,
+                fieldPermutation, fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+                recordDescProvider.getInputRecordDescriptor(this.getActivityId(), 0), usage, datasetId);
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
new file mode 100644
index 0000000..2415556
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import java.util.List;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexDiskComponentBulkLoader;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
+
+public class LSMIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorNodePushable {
+    protected final BulkLoadUsage usage;
+
+    protected final IIndexDataflowHelper primaryIndexHelper;
+    protected final IDatasetLifecycleManager datasetManager;
+    protected final int datasetId;
+    protected final int partition;
+
+    protected ILSMIndex primaryIndex;
+
+    public LSMIndexBulkLoadOperatorNodePushable(IIndexDataflowHelperFactory indexDataflowHelperFactory,
+            IIndexDataflowHelperFactory priamryIndexDataflowHelperFactory, IHyracksTaskContext ctx, int partition,
+            int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex, RecordDescriptor recDesc, BulkLoadUsage usage, int datasetId)
+            throws HyracksDataException {
+        super(indexDataflowHelperFactory, ctx, partition, fieldPermutation, fillFactor, verifyInput, numElementsHint,
+                checkIfEmptyIndex, recDesc);
+
+        if (priamryIndexDataflowHelperFactory != null) {
+            this.primaryIndexHelper =
+                    priamryIndexDataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+        } else {
+            this.primaryIndexHelper = null;
+        }
+        this.usage = usage;
+        this.datasetId = datasetId;
+        this.partition = partition;
+        INcApplicationContext ncCtx =
+                (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
+        datasetManager = ncCtx.getDatasetLifecycleManager();
+    }
+
+    @Override
+    protected void initializeBulkLoader() throws HyracksDataException {
+        ILSMIndex targetIndex = (ILSMIndex) index;
+        if (usage.equals(BulkLoadUsage.LOAD)) {
+            // for a loaded dataset, we use the default Id 0 which is guaranteed to be smaller
+            // than Ids of all memory components
+
+            // TODO handle component Id for datasets loaded multiple times
+            // TODO move this piece of code to io operation callback
+            bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+            ILSMDiskComponent diskComponent = ((LSMIndexDiskComponentBulkLoader) bulkLoader).getComponent();
+            LSMComponentIdUtils.persist(LSMComponentId.DEFAULT_COMPONENT_ID, diskComponent.getMetadata());
+        } else {
+            primaryIndexHelper.open();
+            primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance();
+            List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
+            bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+            if (!primaryComponents.isEmpty()) {
+                // TODO move this piece of code to io operation callback
+                // Ideally, this should be done in io operation callback when a bulk load operation is finished
+                // However, currently we don't have an extensible callback mechanism to support this
+                ILSMComponentId bulkloadId = LSMComponentIdUtils.union(primaryComponents.get(0).getId(),
+                        primaryComponents.get(primaryComponents.size() - 1).getId());
+                ILSMDiskComponent diskComponent = ((LSMIndexDiskComponentBulkLoader) bulkLoader).getComponent();
+                LSMComponentIdUtils.persist(bulkloadId, diskComponent.getMetadata());
+            }
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            super.close();
+        } finally {
+            if (primaryIndex != null) {
+                primaryIndexHelper.close();
+            }
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
index cee20ce..6d9ec47 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -19,24 +19,20 @@
 package org.apache.asterix.runtime.operators;
 
 import java.nio.ByteBuffer;
-import java.util.List;
 
 import org.apache.asterix.runtime.operators.LSMSecondaryIndexCreationTupleProcessorNodePushable.DeletedTupleCounter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexDiskComponentBulkLoader;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
 
 /**
  * This operator node is used to bulk load incoming tuples (scanned from the primary index)
@@ -56,12 +52,9 @@
     private ILSMIndex primaryIndex;
     private ILSMIndex secondaryIndex;
 
-    private ILSMDiskComponent component;
-    private ILSMDiskComponentBulkLoader componentBulkLoader;
+    private LSMIndexDiskComponentBulkLoader componentBulkLoader;
     private int currentComponentPos = -1;
 
-    private ILSMDiskComponent[] diskComponents;
-
     public LSMSecondaryIndexBulkLoadNodePushable(IHyracksTaskContext ctx, int partition, RecordDescriptor inputRecDesc,
             IIndexDataflowHelperFactory primaryIndexHelperFactory,
             IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] fieldPermutation, int numTagFields,
@@ -92,7 +85,6 @@
         super.open();
         primaryIndexHelper.open();
         primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance();
-        diskComponents = new ILSMDiskComponent[primaryIndex.getDiskComponents().size()];
         secondaryIndexHelper.open();
         secondaryIndex = (ILSMIndex) secondaryIndexHelper.getIndexInstance();
 
@@ -107,8 +99,6 @@
             closeException = e;
         }
 
-        activateComponents();
-
         try {
             if (primaryIndexHelper != null) {
                 primaryIndexHelper.close();
@@ -184,24 +174,22 @@
     }
 
     private void endCurrentComponent() throws HyracksDataException {
-        if (component != null) {
-            // set disk component id
-
+        if (componentBulkLoader != null) {
             componentBulkLoader.end();
-            diskComponents[currentComponentPos] = component;
-
             componentBulkLoader = null;
-            component = null;
         }
     }
 
     private void loadNewComponent(int componentPos) throws HyracksDataException {
         endCurrentComponent();
 
-        component = secondaryIndex.createBulkLoadTarget();
         int numTuples = getNumDeletedTuples(componentPos);
-        componentBulkLoader = component.createBulkLoader(1.0f, false, numTuples, false, true, true);
-
+        ILSMDiskComponent primaryComponent = primaryIndex.getDiskComponents().get(componentPos);
+        componentBulkLoader =
+                (LSMIndexDiskComponentBulkLoader) secondaryIndex.createBulkLoader(1.0f, false, numTuples, false);
+        ILSMDiskComponent diskComponent = componentBulkLoader.getComponent();
+        // TODO move this piece of code to io operation callback
+        LSMComponentIdUtils.persist(primaryComponent.getId(), diskComponent.getMetadata());
     }
 
     private void addAntiMatterTuple(ITupleReference tuple) throws HyracksDataException {
@@ -220,30 +208,6 @@
 
     }
 
-    private void activateComponents() throws HyracksDataException {
-        List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
-        for (int i = diskComponents.length - 1; i >= 0; i--) {
-            // start from the oldest component to the newest component
-            if (diskComponents[i] != null && diskComponents[i].getComponentSize() > 0) {
-                secondaryIndex.getIOOperationCallback().afterOperation(LSMIOOperationType.FLUSH, null,
-                        diskComponents[i]);
-
-                // setting component id has to be place between afterOperation and addBulkLoadedComponent,
-                // since afterOperation would set a flush component id (but it's not invalid)
-                // and addBulkLoadedComponent would finalize the component
-                ILSMDiskComponentId primaryComponentId = primaryComponents.get(i).getComponentId();
-                //set component id
-                diskComponents[i].getMetadata().put(ILSMDiskComponentId.COMPONENT_ID_MIN_KEY,
-                        LongPointable.FACTORY.createPointable(primaryComponentId.getMinId()));
-                diskComponents[i].getMetadata().put(ILSMDiskComponentId.COMPONENT_ID_MAX_KEY,
-                        LongPointable.FACTORY.createPointable(primaryComponentId.getMaxId()));
-
-                ((AbstractLSMIndex) secondaryIndex).getLsmHarness().addBulkLoadedComponent(diskComponents[i]);
-
-            }
-        }
-    }
-
     private int getNumDeletedTuples(int componentPos) {
         DeletedTupleCounter counter = (DeletedTupleCounter) ctx.getStateObject(partition);
         return counter.get(componentPos);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 5fc07ad..095159b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -65,7 +65,7 @@
         index = indexHelper.getIndexInstance();
         try {
             writer.open();
-            bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+            initializeBulkLoader();
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
@@ -116,4 +116,8 @@
             writer.fail();
         }
     }
+
+    protected void initializeBulkLoader() throws HyracksDataException {
+        bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
index 6083637..a859f68 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
@@ -57,6 +57,7 @@
     public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException {
         IIOManager ioManager = serviceCtx.getIoManager();
         FileReference file = ioManager.resolve(path);
+        ioOpCallbackFactory.initialize(serviceCtx);
         return LSMBTreeUtil.createExternalBTree(ioManager, file, storageManager.getBufferCache(serviceCtx), typeTraits,
                 cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
                 mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
index 04b63f9..9422253 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
@@ -60,6 +60,7 @@
     public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException {
         IIOManager ioManager = serviceCtx.getIoManager();
         FileReference file = ioManager.resolve(path);
+        ioOpCallbackFactory.initialize(serviceCtx);
         return LSMBTreeUtil.createExternalBTreeWithBuddy(ioManager, file, storageManager.getBufferCache(serviceCtx),
                 typeTraits, cmpFactories, bloomFilterFalsePositiveRate,
                 mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
index dfa88da..1988736 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
@@ -70,6 +70,7 @@
         IIOManager ioManager = serviceCtx.getIoManager();
         FileReference file = ioManager.resolve(path);
         List<IVirtualBufferCache> vbcs = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
+        ioOpCallbackFactory.initialize(serviceCtx);
         //TODO: enable updateAwareness for secondary LSMBTree indexes
         boolean updateAware = false;
         return LSMBTreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx), typeTraits,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index a8e707f..d759167 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -24,11 +24,10 @@
 import org.apache.hyracks.storage.am.btree.impls.BTree;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 
-public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent implements ILSMDiskComponent {
+public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent {
     protected final BTree btree;
 
     public LSMBTreeDiskComponent(AbstractLSMIndex lsmIndex, BTree btree, ILSMComponentFilter filter) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
index a60f544..ab8e899 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
@@ -127,4 +127,11 @@
      * @return index data structure that is the stored in the component
      */
     IIndex getIndex();
+
+    /**
+     *
+     * @return id of the component
+     * @throws HyracksDataException
+     */
+    ILSMComponentId getId() throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
new file mode 100644
index 0000000..5662862
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+/**
+ * Stores the id of the disk component, which is a interval (minId, maxId).
+ * It is generated by {@link ILSMComponentIdGenerator}
+ *
+ */
+public interface ILSMComponentId {
+    public enum IdCompareResult {
+        UNKNOWN,
+        LESS_THAN,
+        GREATER_THAN,
+        INTERSECT,
+        INCLUDE
+    }
+
+    /**
+     * @return whether the id is missing
+     */
+    boolean missing();
+
+    IdCompareResult compareTo(ILSMComponentId id);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
new file mode 100644
index 0000000..5dd3061
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+/**
+ * This interface generates component Ids for LSM components (both memory and disk components).
+ */
+public interface ILSMComponentIdGenerator {
+
+    /**
+     * @return An Id for LSM component
+     */
+    public ILSMComponentId getId();
+
+    /**
+     * Refresh the component Id generator to generate the next Id.
+     * {@link #getId()} would always return the same Id before this method is called.
+     */
+    public void refresh();
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java
new file mode 100644
index 0000000..c0f530b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+
+@FunctionalInterface
+public interface ILSMComponentIdGeneratorFactory extends Serializable {
+
+    ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
index 43c5482..bd2bb45 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
@@ -48,14 +48,6 @@
     int getFileReferenceCount();
 
     /**
-     * Return the component Id of this disk component from its metadata
-     *
-     * @return
-     * @throws HyracksDataException
-     */
-    ILSMDiskComponentId getComponentId() throws HyracksDataException;
-
-    /**
      * @return LsmIndex of the component
      */
     AbstractLSMIndex getLsmIndex();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java
deleted file mode 100644
index 5d38ace..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponentId.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.storage.am.lsm.common.api;
-
-import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
-
-/**
- * Stores the id of the disk component, which is a interval (minId, maxId).
- * When a disk component is formed by the flush operation, its initial minId and maxId are the same, and
- * currently are set as the flush LSN.
- * When a disk component is formed by the merge operation, its [minId, maxId] is set as the union of
- * all ids of merged disk components.
- *
- * @author luochen
- *
- */
-public interface ILSMDiskComponentId {
-
-    public static final long NOT_FOUND = -1;
-
-    public static final MutableArrayValueReference COMPONENT_ID_MIN_KEY =
-            new MutableArrayValueReference("Component_Id_Min".getBytes());
-
-    public static final MutableArrayValueReference COMPONENT_ID_MAX_KEY =
-            new MutableArrayValueReference("Component_Id_Max".getBytes());
-
-    long getMinId();
-
-    long getMaxId();
-
-    default boolean notFound() {
-        return getMinId() == NOT_FOUND || getMaxId() == NOT_FOUND;
-    }
-
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
index b291f7c..a9dc50e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
@@ -20,7 +20,15 @@
 
 import java.io.Serializable;
 
-@FunctionalInterface
+import org.apache.hyracks.api.application.INCServiceContext;
+
 public interface ILSMIOOperationCallbackFactory extends Serializable {
+    /**
+     * Initialize the callback factory with the given ncCtx
+     *
+     * @param ncCtx
+     */
+    void initialize(INCServiceContext ncCtx);
+
     ILSMIOOperationCallback createIoOpCallback(ILSMIndex index);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
index 13543e4..f892585 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
@@ -101,4 +101,12 @@
      * @return the size of the memory component
      */
     long getSize();
+
+    /**
+     * Reset the component Id of the memory component after it's recycled
+     *
+     * @param newId
+     * @throws HyracksDataException
+     */
+    void resetId(ILSMComponentId newId) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index a0d1c23..b664102 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -18,19 +18,29 @@
  */
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
+import java.util.logging.Logger;
+
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
 import org.apache.hyracks.storage.common.MultiComparator;
 
 public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent implements ILSMDiskComponent {
 
+    private static final Logger LOGGER = Logger.getLogger(AbstractLSMDiskComponent.class.getName());
+
     private final DiskComponentMetadata metadata;
 
+    // a variable cache of componentId stored in metadata.
+    // since componentId is immutable, we do not want to read from metadata every time the componentId
+    // is requested.
+    private ILSMComponentId componentId;
+
     public AbstractLSMDiskComponent(AbstractLSMIndex lsmIndex, IMetadataPageManager mdPageManager,
             ILSMComponentFilter filter) {
         super(lsmIndex, filter);
@@ -109,13 +119,23 @@
     }
 
     @Override
-    public ILSMDiskComponentId getComponentId() throws HyracksDataException {
-        long minID = ComponentUtils.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MIN_KEY,
-                ILSMDiskComponentId.NOT_FOUND);
-        long maxID = ComponentUtils.getLong(metadata, ILSMDiskComponentId.COMPONENT_ID_MAX_KEY,
-                ILSMDiskComponentId.NOT_FOUND);
-        //TODO: do we need to throw an exception when ID is not found?
-        return new LSMDiskComponentId(minID, maxID);
+    public ILSMComponentId getId() throws HyracksDataException {
+        if (componentId != null) {
+            return componentId;
+        }
+        synchronized (this) {
+            if (componentId == null) {
+                componentId = LSMComponentIdUtils.readFrom(metadata);
+            }
+        }
+        if (componentId.missing()) {
+            // For normal datasets, componentId shouldn't be missing, since otherwise it'll be a bug.
+            // However, we cannot throw an exception here to be compatible with legacy datasets.
+            // In this case, the disk component would always get a garbage Id [-1, -1], which makes the
+            // component Id-based optimization useless but still correct.
+            LOGGER.warning("Component Id not found from disk component metadata");
+        }
+        return componentId;
     }
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 2b2fe0d..b0cc318 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -44,6 +44,8 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId.IdCompareResult;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
@@ -438,6 +440,7 @@
         if (c != EmptyComponent.INSTANCE) {
             diskComponents.add(0, c);
         }
+        assert checkComponentIds();
     }
 
     @Override
@@ -448,6 +451,25 @@
         if (newComponent != EmptyComponent.INSTANCE) {
             diskComponents.add(swapIndex, newComponent);
         }
+        assert checkComponentIds();
+    }
+
+    /**
+     * A helper method to ensure disk components have proper Ids (non-decreasing)
+     * We may get rid of this method once component Id is stablized
+     *
+     * @throws HyracksDataException
+     */
+    private boolean checkComponentIds() throws HyracksDataException {
+        for (int i = 0; i < diskComponents.size() - 1; i++) {
+            ILSMComponentId id1 = diskComponents.get(i).getId();
+            ILSMComponentId id2 = diskComponents.get(i + 1).getId();
+            IdCompareResult cmp = id1.compareTo(id2);
+            if (cmp != IdCompareResult.UNKNOWN && cmp != IdCompareResult.GREATER_THAN) {
+                return false;
+            }
+        }
+        return true;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index b7c3350..0378aae 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -22,9 +22,12 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId.IdCompareResult;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 
 public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent implements ILSMMemoryComponent {
@@ -34,6 +37,7 @@
     private int writerCount;
     private boolean requestedToBeActive;
     private final MemoryComponentMetadata metadata;
+    private ILSMComponentId componentId;
 
     public AbstractLSMMemoryComponent(AbstractLSMIndex lsmIndex, IVirtualBufferCache vbc, boolean isActive,
             ILSMComponentFilter filter) {
@@ -247,6 +251,7 @@
     protected void doDeallocate() throws HyracksDataException {
         getIndex().deactivate();
         getIndex().destroy();
+        componentId = null;
     }
 
     @Override
@@ -259,4 +264,19 @@
         IBufferCache virtualBufferCache = getIndex().getBufferCache();
         return virtualBufferCache.getPageBudget() * (long) virtualBufferCache.getPageSize();
     }
+
+    @Override
+    public ILSMComponentId getId() {
+        return componentId;
+    }
+
+    @Override
+    public void resetId(ILSMComponentId componentId) throws HyracksDataException {
+        if (this.componentId != null && this.componentId.compareTo(componentId) != IdCompareResult.LESS_THAN) {
+            throw new IllegalStateException(
+                    "LSM memory component receives illegal id. Old id " + this.componentId + ", new id " + componentId);
+        }
+        this.componentId = componentId;
+        LSMComponentIdUtils.persist(this.componentId, metadata);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
index f2751bf..e3ca9f1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
@@ -25,8 +25,8 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.common.IIndex;
 
@@ -83,8 +83,8 @@
     }
 
     @Override
-    public ILSMDiskComponentId getComponentId() throws HyracksDataException {
-        return null;
+    public ILSMComponentId getId() {
+        return LSMComponentId.MISSING_COMPONENT_ID;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
new file mode 100644
index 0000000..dd86f65
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+
+public class LSMComponentId implements ILSMComponentId {
+
+    public static final long NOT_FOUND = -1;
+
+    // Use to handle legacy datasets which do not have the component Id
+    public static final ILSMComponentId MISSING_COMPONENT_ID = new LSMComponentId(NOT_FOUND, NOT_FOUND);
+
+    // A default component id used for bulk loaded component
+    public static final ILSMComponentId DEFAULT_COMPONENT_ID = new LSMComponentId(0, 0);
+
+    private long minId;
+
+    private long maxId;
+
+    public LSMComponentId(long minId, long maxId) {
+        assert minId <= maxId;
+        this.minId = minId;
+        this.maxId = maxId;
+    }
+
+    public void reset(long minId, long maxId) {
+        this.minId = minId;
+        this.maxId = maxId;
+    }
+
+    public long getMinId() {
+        return this.minId;
+    }
+
+    public long getMaxId() {
+        return this.maxId;
+    }
+
+    @Override
+    public boolean missing() {
+        return minId == NOT_FOUND || maxId == NOT_FOUND;
+    }
+
+    @Override
+    public String toString() {
+        return "[" + minId + "," + maxId + "]";
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 * Long.hashCode(minId) + Long.hashCode(maxId);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof LSMComponentId)) {
+            return false;
+        }
+        LSMComponentId other = (LSMComponentId) obj;
+        if (maxId != other.maxId) {
+            return false;
+        }
+        if (minId != other.minId) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public IdCompareResult compareTo(ILSMComponentId id) {
+        if (this.missing() || id == null || id.missing()) {
+            return IdCompareResult.UNKNOWN;
+        }
+        LSMComponentId componentId = (LSMComponentId) id;
+        if (this.getMinId() > componentId.getMaxId()) {
+            return IdCompareResult.GREATER_THAN;
+        } else if (this.getMaxId() < componentId.getMinId()) {
+            return IdCompareResult.LESS_THAN;
+        } else if (this.getMinId() <= componentId.getMinId() && this.getMaxId() >= componentId.getMaxId()) {
+            return IdCompareResult.INCLUDE;
+        } else {
+            return IdCompareResult.INTERSECT;
+        }
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
new file mode 100644
index 0000000..e174153
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+
+/**
+ * A default implementation of {@link ILSMComponentIdGenerator}.
+ *
+ */
+public class LSMComponentIdGenerator implements ILSMComponentIdGenerator {
+
+    protected long previousTimestamp = -1L;
+
+    private ILSMComponentId componentId;
+
+    public LSMComponentIdGenerator() {
+        refresh();
+    }
+
+    @Override
+    public void refresh() {
+        long ts = getCurrentTimestamp();
+        componentId = new LSMComponentId(ts, ts);
+    }
+
+    @Override
+    public ILSMComponentId getId() {
+        return componentId;
+    }
+
+    protected long getCurrentTimestamp() {
+        long timestamp = System.currentTimeMillis();
+        while (timestamp <= previousTimestamp) {
+            // make sure timestamp is strictly increasing
+            try {
+                Thread.sleep(1);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+            timestamp = System.currentTimeMillis();
+        }
+        previousTimestamp = timestamp;
+        return timestamp;
+
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java
new file mode 100644
index 0000000..c55ef19
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
+
+/**
+ * A default implementation of {@link ILSMComponentIdGeneratorFactory}.
+ *
+ */
+public class LSMComponentIdGeneratorFactory implements ILSMComponentIdGeneratorFactory {
+
+    @Override
+    public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) {
+        return new LSMComponentIdGenerator();
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java
deleted file mode 100644
index f448c84..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMDiskComponentId.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.storage.am.lsm.common.impls;
-
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
-
-public class LSMDiskComponentId implements ILSMDiskComponentId {
-
-    private final long minId;
-
-    private final long maxId;
-
-    public LSMDiskComponentId(long minId, long maxId) {
-        this.minId = minId;
-        this.maxId = maxId;
-    }
-
-    @Override
-    public long getMinId() {
-        return this.minId;
-    }
-
-    @Override
-    public long getMaxId() {
-        return this.maxId;
-    }
-
-    @Override
-    public String toString() {
-        return "[" + minId + "," + maxId + "]";
-    }
-
-    @Override
-    public int hashCode() {
-        return 31 * Long.hashCode(minId) + Long.hashCode(maxId);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (!(obj instanceof LSMDiskComponentId)) {
-            return false;
-        }
-        LSMDiskComponentId other = (LSMDiskComponentId) obj;
-        if (maxId != other.maxId) {
-            return false;
-        }
-        if (minId != other.minId) {
-            return false;
-        }
-        return true;
-    }
-
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 48b6d8f..b0abeb1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -95,15 +95,24 @@
                 // Before entering the components, prune those corner cases that indeed should not proceed.
                 switch (opType) {
                     case FLUSH:
+                        // if the lsm index does not have memory components allocated, then nothing to flush
+                        if (!lsmIndex.isMemoryComponentsAllocated()) {
+                            return false;
+                        }
                         ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0);
                         if (!flushingComponent.isModified()) {
-                            //The mutable component has not been modified by any writer. There is nothing to flush.
-                            //since the component is empty, set its state back to READABLE_WRITABLE
                             if (flushingComponent.getState() == ComponentState.READABLE_UNWRITABLE) {
+                                //The mutable component has not been modified by any writer. There is nothing to flush.
+                                //since the component is empty, set its state back to READABLE_WRITABLE only when it's
+                                //state has been set to READABLE_UNWRITABLE
                                 flushingComponent.setState(ComponentState.READABLE_WRITABLE);
                                 opTracker.notifyAll();
+
+                                // Call recycled only when we change it's state is reset back to READABLE_WRITABLE
+                                // Otherwise, if the component is in other state, e.g., INACTIVE, or
+                                // READABLE_UNWRITABLE_FLUSHING, it's not considered as being recycled here.
+                                lsmIndex.getIOOperationCallback().recycled(flushingComponent);
                             }
-                            lsmIndex.getIOOperationCallback().recycled(flushingComponent);
                             return false;
                         }
                         if (flushingComponent.getWriterCount() > 0) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
index 08b8bb6..000d5cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
@@ -21,13 +21,14 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
 
 public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader {
     private final AbstractLSMIndex lsmIndex;
     private final ILSMDiskComponent component;
-    private final IIndexBulkLoader componentBulkLoader;
+    private final ILSMDiskComponentBulkLoader componentBulkLoader;
 
     public LSMIndexDiskComponentBulkLoader(AbstractLSMIndex lsmIndex, float fillFactor, boolean verifyInput,
             long numElementsHint) throws HyracksDataException {
@@ -39,11 +40,19 @@
                 component.createBulkLoader(fillFactor, verifyInput, numElementsHint, false, true, true);
     }
 
+    public ILSMDiskComponent getComponent() {
+        return component;
+    }
+
     @Override
     public void add(ITupleReference tuple) throws HyracksDataException {
         componentBulkLoader.add(tuple);
     }
 
+    public void delete(ITupleReference tuple) throws HyracksDataException {
+        componentBulkLoader.delete(tuple);
+    }
+
     @Override
     public void end() throws HyracksDataException {
         try {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
index 09ca553..21d10d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
@@ -20,6 +20,7 @@
 
 import java.util.List;
 
+import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
@@ -37,6 +38,11 @@
         return NoOpIOOperationCallback.INSTANCE;
     }
 
+    @Override
+    public void initialize(INCServiceContext ncCtx) {
+        // No op
+    }
+
     public static class NoOpIOOperationCallback implements ILSMIOOperationCallback {
         private static final NoOpIOOperationCallback INSTANCE = new NoOpIOOperationCallback();
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
new file mode 100644
index 0000000..3c88543
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.util;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+
+public class LSMComponentIdUtils {
+
+    private static final MutableArrayValueReference COMPONENT_ID_MIN_KEY =
+            new MutableArrayValueReference("Component_Id_Min".getBytes());
+
+    private static final MutableArrayValueReference COMPONENT_ID_MAX_KEY =
+            new MutableArrayValueReference("Component_Id_Max".getBytes());
+
+    private LSMComponentIdUtils() {
+
+    }
+
+    public static ILSMComponentId readFrom(IComponentMetadata metadata) throws HyracksDataException {
+        long minId = ComponentUtils.getLong(metadata, COMPONENT_ID_MIN_KEY, LSMComponentId.NOT_FOUND);
+        long maxId = ComponentUtils.getLong(metadata, COMPONENT_ID_MAX_KEY, LSMComponentId.NOT_FOUND);
+        if (minId == LSMComponentId.NOT_FOUND || maxId == LSMComponentId.NOT_FOUND) {
+            return LSMComponentId.MISSING_COMPONENT_ID;
+        } else {
+            return new LSMComponentId(minId, maxId);
+        }
+    }
+
+    public static void persist(ILSMComponentId id, IComponentMetadata metadata) throws HyracksDataException {
+        LSMComponentId componentId = (LSMComponentId) id;
+        metadata.put(COMPONENT_ID_MIN_KEY, LongPointable.FACTORY.createPointable(componentId.getMinId()));
+        metadata.put(COMPONENT_ID_MAX_KEY, LongPointable.FACTORY.createPointable(componentId.getMaxId()));
+    }
+
+    public static ILSMComponentId union(ILSMComponentId id1, ILSMComponentId id2) {
+        long minId = Long.min(((LSMComponentId) id1).getMinId(), ((LSMComponentId) id2).getMinId());
+        long maxId = Long.max(((LSMComponentId) id1).getMaxId(), ((LSMComponentId) id2).getMaxId());
+        return new LSMComponentId(minId, maxId);
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java
index cdb55bf..a45f006 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java
@@ -87,6 +87,7 @@
         IBufferCache bufferCache = storageManager.getBufferCache(serviceCtx);
         ILSMMergePolicy mergePolicy = mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx);
         ILSMIOOperationScheduler ioScheduler = ioSchedulerProvider.getIoScheduler(serviceCtx);
+        ioOpCallbackFactory.initialize(serviceCtx);
         if (isPartitioned) {
             return InvertedIndexUtils.createPartitionedLSMInvertedIndex(ioManager, virtualBufferCaches, typeTraits,
                     cmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory, bufferCache,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java
index f0ea5b8..6918b19 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java
@@ -66,6 +66,7 @@
     public IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException {
         IIOManager ioManager = ncServiceCtx.getIoManager();
         FileReference fileRef = ioManager.resolve(path);
+        ioOpCallbackFactory.initialize(ncServiceCtx);
         return LSMRTreeUtils.createExternalRTree(ioManager, fileRef, storageManager.getBufferCache(ncServiceCtx),
                 typeTraits, cmpFactories, btreeCmpFactories, valueProviderFactories, rtreePolicyType,
                 bloomFilterFalsePositiveRate, mergePolicyFactory.createMergePolicy(mergePolicyProperties, ncServiceCtx),
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java
index 1fa5081..f6396cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java
@@ -83,6 +83,7 @@
         IIOManager ioManager = ncServiceCtx.getIoManager();
         FileReference fileRef = ioManager.resolve(path);
         List<IVirtualBufferCache> virtualBufferCaches = vbcProvider.getVirtualBufferCaches(ncServiceCtx, fileRef);
+        ioOpCallbackFactory.initialize(ncServiceCtx);
         return LSMRTreeUtils.createLSMTree(ioManager, virtualBufferCaches, fileRef,
                 storageManager.getBufferCache(ncServiceCtx), typeTraits, cmpFactories, btreeCmpFactories,
                 valueProviderFactories, rtreePolicyType, bloomFilterFalsePositiveRate,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java
index aea205d..e807c72 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java
@@ -78,6 +78,7 @@
         IIOManager ioManager = serviceCtx.getIoManager();
         FileReference file = ioManager.resolve(path);
         List<IVirtualBufferCache> virtualBufferCaches = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
+        ioOpCallbackFactory.initialize(serviceCtx);
         return LSMRTreeUtils.createLSMTreeWithAntiMatterTuples(ioManager, virtualBufferCaches, file,
                 storageManager.getBufferCache(serviceCtx), typeTraits, cmpFactories, btreeComparatorFactories,
                 valueProviderFactories, rtreePolicyType,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 1a5121b..a20eeb7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -236,7 +236,7 @@
         }
         ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(opCtx, returnDeletedTuples);
         ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(getLsmHarness(), opCtx, cursorFactory);
-        return new MergeOperation(accessor, mergeFileRefs.getInsertIndexFileReference(), callback,
-                fileManager.getBaseDir().getAbsolutePath(), cursor);
+        return new LSMRTreeMergeOperation(accessor, cursor, mergeFileRefs.getInsertIndexFileReference(), null, null,
+                callback, fileManager.getBaseDir().getAbsolutePath());
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
index d8dda71..47a8046 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
@@ -60,6 +60,7 @@
         IIOManager ioManager = serviceCtx.getIoManager();
         FileReference file = ioManager.resolve(path);
         List<IVirtualBufferCache> vbcs = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
+        ioOpCallbackFactory.initialize(serviceCtx);
         return TestLsmBtreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx),
                 typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
                 mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),