Merge commit '95f508bbb1d07650392c21da2958ba1c53f2a03d' from stabilization-f69489

Change-Id: Ib6a428564681938dceb6ad4a2ba91f15256dc8a2
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InjectTypeCastForSwitchCaseRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InjectTypeCastForSwitchCaseRule.java
index fa9385a..683d29f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InjectTypeCastForSwitchCaseRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InjectTypeCastForSwitchCaseRule.java
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.asterix.dataflow.data.common.TypeResolverUtil;
 import org.apache.asterix.lang.common.util.FunctionUtil;
@@ -38,14 +39,23 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
+import com.google.common.collect.ImmutableSet;
+
 /**
- * This rule injects cast functions for "THEN" and "ELSE" branches of a switch-case function if
- * different "THEN" and "ELSE" branches have heterogeneous return types.
+ * This rule injects casts for function parameters if they have heterogeneous return types:
+ * <ul>
+ *     <li>for "THEN" and "ELSE" branches of a switch-case function</li>
+ *     <li>for parameters of "if missing/null" functions  (if-missing(), if-null(), if-missing-or-null())</li>
+ * </ul>
  */
 public class InjectTypeCastForSwitchCaseRule implements IAlgebraicRewriteRule {
 
+    private static final Set<FunctionIdentifier> IF_FUNCTIONS =
+            ImmutableSet.of(BuiltinFunctions.IF_MISSING, BuiltinFunctions.IF_NULL, BuiltinFunctions.IF_MISSING_OR_NULL);
+
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
@@ -80,10 +90,17 @@
                 rewritten = true;
             }
         }
-        if (!func.getFunctionIdentifier().equals(BuiltinFunctions.SWITCH_CASE)) {
-            return rewritten;
+        FunctionIdentifier funcId = func.getFunctionIdentifier();
+        if (funcId.equals(BuiltinFunctions.SWITCH_CASE)) {
+            if (rewriteSwitchCase(op, func, context)) {
+                rewritten = true;
+            }
+        } else if (IF_FUNCTIONS.contains(funcId)) {
+            if (rewriteFunction(op, func, context)) {
+                rewritten = true;
+            }
         }
-        return rewriteSwitchCase(op, func, context);
+        return rewritten;
     }
 
     // Injects casts that cast types for different "THEN" and "ELSE" branches.
@@ -96,20 +113,44 @@
         boolean rewritten = false;
         for (int argIndex = 2; argIndex < argSize; argIndex += (argIndex + 2 == argSize) ? 1 : 2) {
             Mutable<ILogicalExpression> argRef = argRefs.get(argIndex);
-            IAType type = (IAType) env.getType(argRefs.get(argIndex).getValue());
-            if (TypeResolverUtil.needsCast(producedType, type)) {
-                ILogicalExpression argExpr = argRef.getValue();
-                // Injects a cast call to cast the data type to the produced type of the switch-case function call.
-                ScalarFunctionCallExpression castFunc =
-                        new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.CAST_TYPE),
-                                new ArrayList<>(Collections.singletonList(new MutableObject<>(argExpr))));
-                castFunc.setSourceLocation(argExpr.getSourceLocation());
-                TypeCastUtils.setRequiredAndInputTypes(castFunc, producedType, type);
-                argRef.setValue(castFunc);
+            if (rewriteFunctionArgument(argRef, producedType, env)) {
                 rewritten = true;
             }
         }
         return rewritten;
     }
 
+    // Injects casts that cast types for all function parameters
+    private boolean rewriteFunction(ILogicalOperator op, AbstractFunctionCallExpression func,
+            IOptimizationContext context) throws AlgebricksException {
+        IVariableTypeEnvironment env = op.computeInputTypeEnvironment(context);
+        IAType producedType = (IAType) env.getType(func);
+        List<Mutable<ILogicalExpression>> argRefs = func.getArguments();
+        int argSize = argRefs.size();
+        boolean rewritten = false;
+        for (int argIndex = 0; argIndex < argSize; argIndex++) {
+            Mutable<ILogicalExpression> argRef = argRefs.get(argIndex);
+            if (rewriteFunctionArgument(argRef, producedType, env)) {
+                rewritten = true;
+            }
+        }
+        return rewritten;
+    }
+
+    private boolean rewriteFunctionArgument(Mutable<ILogicalExpression> argRef, IAType funcOutputType,
+            IVariableTypeEnvironment env) throws AlgebricksException {
+        ILogicalExpression argExpr = argRef.getValue();
+        IAType type = (IAType) env.getType(argExpr);
+        if (TypeResolverUtil.needsCast(funcOutputType, type)) {
+            // Injects a cast call to cast the data type to the produced type of the function call.
+            ScalarFunctionCallExpression castFunc =
+                    new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.CAST_TYPE),
+                            new ArrayList<>(Collections.singletonList(new MutableObject<>(argExpr))));
+            castFunc.setSourceLocation(argExpr.getSourceLocation());
+            TypeCastUtils.setRequiredAndInputTypes(castFunc, funcOutputType, type);
+            argRef.setValue(castFunc);
+            return true;
+        }
+        return false;
+    }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
index d2b456f..0ea16ae 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
@@ -220,13 +220,17 @@
         List<ILogicalOperator> subRoots = new ArrayList<>();
         for (Pair<IAccessMethod, Index> pair : chosenIndexes) {
             AccessMethodAnalysisContext analysisCtx = analyzedAMs.get(pair.first);
-            subRoots.add(pair.first.createIndexSearchPlan(afterSelectRefs, selectRef, conditionRef,
-                    subTree.getAssignsAndUnnestsRefs(), subTree, null, pair.second, analysisCtx,
-                    AccessMethodUtils.retainInputs(subTree.getDataSourceVariables(),
-                            subTree.getDataSourceRef().getValue(), afterSelectRefs),
-                    false, subTree.getDataSourceRef().getValue().getInputs().get(0).getValue()
-                            .getExecutionMode() == ExecutionMode.UNPARTITIONED,
-                    context, null));
+            boolean retainInput = AccessMethodUtils.retainInputs(subTree.getDataSourceVariables(),
+                    subTree.getDataSourceRef().getValue(), afterSelectRefs);
+            boolean requiresBroadcast = subTree.getDataSourceRef().getValue().getInputs().get(0).getValue()
+                    .getExecutionMode() == ExecutionMode.UNPARTITIONED;
+            ILogicalOperator subRoot = pair.first.createIndexSearchPlan(afterSelectRefs, selectRef, conditionRef,
+                    subTree.getAssignsAndUnnestsRefs(), subTree, null, pair.second, analysisCtx, retainInput, false,
+                    requiresBroadcast, context, null);
+            if (subRoot == null) {
+                return false;
+            }
+            subRoots.add(subRoot);
         }
         // Connect each secondary index utilization plan to a common intersect operator.
         ILogicalOperator primaryUnnestOp = connectAll2ndarySearchPlanWithIntersect(subRoots, context);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 64d8e93..3c62d99 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -30,7 +30,6 @@
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Optional;
 
 import org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.storage.IndexCheckpoint;
@@ -56,7 +55,7 @@
     }
 
     @Override
-    public synchronized void init(String lastComponentTimestamp, long lsn) throws HyracksDataException {
+    public synchronized void init(long validComponentSequence, long lsn) throws HyracksDataException {
         List<IndexCheckpoint> checkpoints;
         try {
             checkpoints = getCheckpoints();
@@ -67,25 +66,24 @@
             LOGGER.warn(() -> "Checkpoints found on initializing: " + indexPath);
             delete();
         }
-        IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(lastComponentTimestamp, lsn);
+        IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn);
         persist(firstCheckpoint);
     }
 
     @Override
-    public synchronized void replicated(String componentTimestamp, long masterLsn, long componentId)
+    public synchronized void replicated(long componentSequence, long masterLsn, long componentId)
             throws HyracksDataException {
         final Long localLsn = getLatest().getMasterNodeFlushMap().get(masterLsn);
         if (localLsn == null) {
             throw new IllegalStateException("Component flushed before lsn mapping was received");
         }
-        flushed(componentTimestamp, localLsn, componentId);
+        flushed(componentSequence, localLsn, componentId);
     }
 
     @Override
-    public synchronized void flushed(String componentTimestamp, long lsn, long componentId)
-            throws HyracksDataException {
+    public synchronized void flushed(long componentSequence, long lsn, long componentId) throws HyracksDataException {
         final IndexCheckpoint latest = getLatest();
-        IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn, componentTimestamp, componentId);
+        IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn, componentSequence, componentId);
         persist(nextCheckpoint);
         deleteHistory(nextCheckpoint.getId(), HISTORY_CHECKPOINTS);
     }
@@ -95,7 +93,7 @@
         final IndexCheckpoint latest = getLatest();
         latest.getMasterNodeFlushMap().put(masterLsn, localLsn);
         final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(),
-                latest.getValidComponentTimestamp(), latest.getLastComponentId());
+                latest.getValidComponentSequence(), latest.getLastComponentId());
         persist(next);
         notifyAll();
     }
@@ -119,9 +117,8 @@
     }
 
     @Override
-    public Optional<String> getValidComponentTimestamp() throws HyracksDataException {
-        String validComponentTimestamp = getLatest().getValidComponentTimestamp();
-        return validComponentTimestamp != null ? Optional.of(validComponentTimestamp) : Optional.empty();
+    public long getValidComponentSequence() throws HyracksDataException {
+        return getLatest().getValidComponentSequence();
     }
 
     @Override
@@ -153,18 +150,17 @@
     @Override
     public synchronized void setLastComponentId(long componentId) throws HyracksDataException {
         final IndexCheckpoint latest = getLatest();
-        final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(),
-                latest.getValidComponentTimestamp(), componentId);
+        final IndexCheckpoint next =
+                IndexCheckpoint.next(latest, latest.getLowWatermark(), latest.getValidComponentSequence(), componentId);
         persist(next);
     }
 
     @Override
-    public synchronized void advanceValidComponentTimestamp(String timestamp) throws HyracksDataException {
+    public synchronized void advanceValidComponentSequence(long componentSequence) throws HyracksDataException {
         final IndexCheckpoint latest = getLatest();
-        if (latest.getValidComponentTimestamp() == null
-                || timestamp.compareTo(latest.getValidComponentTimestamp()) > 0) {
-            final IndexCheckpoint next =
-                    IndexCheckpoint.next(latest, latest.getLowWatermark(), timestamp, latest.getLastComponentId());
+        if (componentSequence > latest.getValidComponentSequence()) {
+            final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(), componentSequence,
+                    latest.getLastComponentId());
             persist(next);
         }
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index adf9960..946815f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -300,7 +300,6 @@
         TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false);
 
         ILogRecord logRecord = null;
-        ILSMComponentIdGenerator idGenerator = null;
         try {
             logReader.setPosition(lowWaterMarkLSN);
             logRecord = logReader.next();
@@ -389,8 +388,7 @@
                         int partition = logRecord.getResourcePartition();
                         if (partitions.contains(partition)) {
                             int datasetId = logRecord.getDatasetId();
-                            idGenerator = datasetLifecycleManager.getComponentIdGenerator(datasetId, partition);
-                            if (idGenerator == null) {
+                            if (!datasetLifecycleManager.isRegistered(datasetId)) {
                                 // it's possible this dataset has been dropped
                                 logRecord = logReader.next();
                                 continue;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 017c59f..5aa1b36 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -56,6 +56,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.test.support.TestUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -75,6 +76,7 @@
     private static ITransactionContext txnCtx;
     private static LSMInsertDeleteOperatorNodePushable insertOp;
     private static final int PARTITION = 0;
+    private static String indexPath;
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -109,6 +111,7 @@
         txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
                 new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
         insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
+        indexPath = indexDataflowHelper.getResource().getPath();
     }
 
     @After
@@ -148,9 +151,9 @@
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
             ILSMComponentId next =
-                    dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+                    dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
             long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
             Map<String, Object> flushMap = new HashMap<>();
             flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -163,8 +166,8 @@
             // rollback the last disk component
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
-            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
-            next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
+            next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
             flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
             flushMap = new HashMap<>();
             flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -216,9 +219,9 @@
             Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
             StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
             ILSMComponentId next =
-                    dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+                    dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
             long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
             Map<String, Object> flushMap = new HashMap<>();
             flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -248,8 +251,8 @@
             // rollback the last disk component
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
-            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
-            next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
+            next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
             flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
             flushMap = new HashMap<>();
             flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -302,9 +305,9 @@
             firstSearcher.waitUntilEntered();
             // now that we enetered, we will rollback
             ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
             ILSMComponentId next =
-                    dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+                    dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
             long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
             Map<String, Object> flushMap = new HashMap<>();
             flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -329,8 +332,8 @@
             secondSearcher.waitUntilEntered();
             lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             long lsn = LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
-            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
-            next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+            dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
+            next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
             flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
             flushMap = new HashMap<>();
             flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -738,9 +741,10 @@
                 public void run() {
                     ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
                     try {
-                        dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
-                        ILSMComponentId next =
-                                dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+                        dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath)
+                                .refresh();
+                        ILSMComponentId next = dsLifecycleMgr
+                                .getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
                         long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
                         Map<String, Object> flushMap = new HashMap<>();
                         flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
index 108afa7..c727f52 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
@@ -28,7 +28,7 @@
 import org.apache.hyracks.api.io.IJsonSerializable;
 import org.apache.hyracks.api.io.IPersistedResourceRegistry;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -64,8 +64,8 @@
         // Whenever this is called, it resets the counter
         // However, the counters for the failed operations are never reset since we expect them
         // To be always 0
-        return new TestLsmIoOpCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index,
-                getComponentIdGenerator().getId(), getIndexCheckpointManagerProvider());
+        return new TestLsmIoOpCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index, getComponentIdGenerator(),
+                getIndexCheckpointManagerProvider());
     }
 
     public int getTotalFlushes() {
@@ -125,9 +125,9 @@
     public class TestLsmIoOpCallback extends LSMIOOperationCallback {
         private final TestLsmBtree lsmBtree;
 
-        public TestLsmIoOpCallback(DatasetInfo dsInfo, ILSMIndex index, ILSMComponentId id,
+        public TestLsmIoOpCallback(DatasetInfo dsInfo, ILSMIndex index, ILSMComponentIdGenerator idGenerator,
                 IIndexCheckpointManagerProvider checkpointManagerProvider) {
-            super(dsInfo, index, id, checkpointManagerProvider);
+            super(dsInfo, index, idGenerator.getId(), checkpointManagerProvider);
             lsmBtree = (TestLsmBtree) index;
         }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
index 22ee542..c4390fa 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -55,12 +55,13 @@
             DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) appCtx.getDatasetLifecycleManager();
             DatasetResource dsr = dslcManager.getDatasetLifecycle(datasetId);
             int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
-            PrimaryIndexOperationTracker opTracker = dslcManager.getOperationTracker(datasetId, partition);
+            PrimaryIndexOperationTracker opTracker =
+                    dslcManager.getOperationTracker(datasetId, partition, resource.getPath());
             if (!(opTracker instanceof TestPrimaryIndexOperationTracker)) {
                 Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
                 opTracker = new TestPrimaryIndexOperationTracker(datasetId, partition,
                         appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(),
-                        dslcManager.getComponentIdGenerator(datasetId, partition));
+                        dslcManager.getComponentIdGenerator(datasetId, partition, resource.getPath()));
                 replaceMapEntry(opTrackersField, dsr, partition, opTracker);
             }
             return opTracker;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
index 694a0c7..54ae683 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
@@ -18,18 +18,11 @@
  */
 package org.apache.asterix.test.storage;
 
-import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT;
-
 import java.io.File;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.text.Format;
-import java.text.SimpleDateFormat;
 import java.util.Arrays;
-import java.util.Date;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 import org.apache.asterix.common.TestDataUtil;
@@ -129,17 +122,15 @@
         TestDataUtil.upsertData(datasetName, 100);
         ncAppCtx.getDatasetLifecycleManager().flushDataset(dataset.getDatasetId(), false);
 
-        // create new invalid component with a timestamp > checkpoint valid component timestamp (i.e. in the future)
-        Format formatter = new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT);
-        Date futureTime = new Date(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(5));
-        String invalidComponentTimestamp =
-                formatter.format(futureTime) + AbstractLSMIndexFileManager.DELIMITER + formatter.format(futureTime);
+        // create new invalid component sequence with a sequence > checkpoint valid component sequence
+        String invalidComponentId = "1000";
+        String invalidComponentRange = invalidComponentId + AbstractLSMIndexFileManager.DELIMITER + invalidComponentId;
         FileReference indexDirRef = ncAppCtx.getIoManager().resolve(indexPath);
         String indexDir = indexDirRef.getFile().getAbsolutePath();
         // create the invalid component files
-        Path btreePath = Paths.get(indexDir, invalidComponentTimestamp + AbstractLSMIndexFileManager.DELIMITER
+        Path btreePath = Paths.get(indexDir, invalidComponentRange + AbstractLSMIndexFileManager.DELIMITER
                 + AbstractLSMIndexFileManager.BTREE_SUFFIX);
-        Path filterPath = Paths.get(indexDir, invalidComponentTimestamp + AbstractLSMIndexFileManager.DELIMITER
+        Path filterPath = Paths.get(indexDir, invalidComponentRange + AbstractLSMIndexFileManager.DELIMITER
                 + AbstractLSMIndexFileManager.BLOOM_FILTER_SUFFIX);
         Files.createFile(btreePath);
         Files.createFile(filterPath);
@@ -156,14 +147,14 @@
         DatasetResourceReference drr = DatasetResourceReference.of(localResource);
         IIndexCheckpointManagerProvider indexCheckpointManagerProvider = ncAppCtx.getIndexCheckpointManagerProvider();
         IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(drr);
-        Optional<String> validComponentTimestamp = indexCheckpointManager.getValidComponentTimestamp();
-        Assert.assertTrue(validComponentTimestamp.isPresent());
+        long validComponentSequence = indexCheckpointManager.getValidComponentSequence();
+        Assert.assertTrue(validComponentSequence > Long.MIN_VALUE);
 
         File[] indexRemainingFiles =
                 indexDirRef.getFile().listFiles(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
         Assert.assertNotNull(indexRemainingFiles);
         long validComponentFilesCount = Arrays.stream(indexRemainingFiles)
-                .filter(file -> file.getName().startsWith(validComponentTimestamp.get())).count();
+                .filter(file -> file.getName().startsWith(String.valueOf(validComponentSequence))).count();
         Assert.assertTrue(validComponentFilesCount > 0);
     }
 
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/btree-index/btree-secondary-68.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/btree-index/btree-secondary-68.sqlpp
new file mode 100644
index 0000000..af04479
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/btree-index/btree-secondary-68.sqlpp
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ *  Description     : Secondary BTree Index intersection with between operator (ASTERIXDB-2448)
+ *  Expected Result : Success
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+create type tpch.OrderType as
+ closed {
+  o_orderkey : bigint,
+  o_custkey : bigint,
+  o_orderstatus : string,
+  o_totalprice : double,
+  o_orderdate : string,
+  o_orderpriority : string,
+  o_clerk : string,
+  o_shippriority : bigint,
+  o_comment : string
+};
+
+create  dataset Orders(OrderType) primary key o_orderkey;
+
+create index idx_custkey on Orders (o_custkey) type btree;
+
+create index idx_orderpriority on Orders (o_orderpriority) type btree;
+
+select o_custkey, o_orderkey, o_orderstatus from Orders
+where
+  o_orderpriority = '1-URGENT' and
+  o_custkey between 40 and 43
+order by o_custkey, o_orderkey;
+
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/btree-index/btree-secondary-68.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/btree-index/btree-secondary-68.plan
new file mode 100644
index 0000000..80def88
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/btree-index/btree-secondary-68.plan
@@ -0,0 +1,32 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$25(ASC), $$26(ASC) ]  |PARTITIONED|
+          -- STABLE_SORT [$$25(ASC), $$26(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STREAM_PROJECT  |PARTITIONED|
+                -- STREAM_SELECT  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- BTREE_SEARCH  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- INTERSECT  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- STABLE_SORT [$$35(ASC)]  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- BTREE_SEARCH  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- STABLE_SORT [$$39(ASC)]  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- BTREE_SEARCH  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/binary/concat2/concat2.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/binary/concat2/concat2.1.query.sqlpp
new file mode 100644
index 0000000..782cce2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/binary/concat2/concat2.1.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+{
+  't1': binary_concat([null,hex('aa')]),
+  't2': binary_concat([hex('aa'),null]),
+  't3': binary_concat([null,missing,hex('aa')]),
+  't4': binary_concat([hex('aa'),null,missing])
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.1.query.sqlpp
new file mode 100644
index 0000000..554cf28
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.1.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description    : Ensure error if there's a duplicate field name in the SELECT clause
+ */
+
+select t as a, 2 as a from range(1, 10) t
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.2.query.sqlpp
new file mode 100644
index 0000000..b548b5d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.2.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description    : Ensure error if there's a duplicate field name in the closed-object-constructor() function
+ */
+
+set `import-private-functions` `true`;
+
+`closed-object-constructor`('b',get_year(current_date()),'c',[20],'c',[30]);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.3.query.sqlpp
new file mode 100644
index 0000000..73810d8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.3.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description    : Ensure error if there's a duplicate field name in the open-object-constructor() function
+ */
+
+set `import-private-functions` `true`;
+
+`open-object-constructor`('d' || to_string(get_year(current_date())),10,'e',[20],'e',[30]);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.1.ddl.sqlpp
new file mode 100644
index 0000000..cb51012
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.1.ddl.sqlpp
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ *  Description     : Secondary BTree Index intersection with between operator (ASTERIXDB-2448)
+ *  Expected Result : Success
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+create type tpch.OrderType as
+ closed {
+  o_orderkey : bigint,
+  o_custkey : bigint,
+  o_orderstatus : string,
+  o_totalprice : double,
+  o_orderdate : string,
+  o_orderpriority : string,
+  o_clerk : string,
+  o_shippriority : bigint,
+  o_comment : string
+};
+
+create  dataset Orders(OrderType) primary key o_orderkey;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.2.update.sqlpp
new file mode 100644
index 0000000..1097325
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.2.update.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+load  dataset Orders using localfs ((`path`=`asterix_nc1://data/tpch0.001/orders.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.3.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.3.ddl.sqlpp
new file mode 100644
index 0000000..62f4cc0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.3.ddl.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+create index idx_custkey on Orders (o_custkey) type btree;
+
+create index idx_orderpriority on Orders (o_orderpriority) type btree;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.4.query.sqlpp
new file mode 100644
index 0000000..75212af
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.4.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+select o_custkey, o_orderkey, o_orderstatus from Orders
+where
+  o_orderpriority = '1-URGENT' and
+  o_custkey between 40 and 43
+order by o_custkey, o_orderkey
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifmissing/ifmissing.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifmissing/ifmissing.1.query.sqlpp
index 2f0d837..0abb997 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifmissing/ifmissing.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifmissing/ifmissing.1.query.sqlpp
@@ -29,5 +29,12 @@
             missing,
             case when get_year(current_datetime()) > 0 then missing else false end,
             case when get_year(current_datetime()) > 0 then true else null end
+        ),
+   "j": (
+            let v = if_missing(
+                case when get_year(current_datetime()) > 0 then missing else false end,
+                { "c": [ 2 ] }
+            )
+            select v as b
         )
 };
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifmissingornull/ifmissingornull.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifmissingornull/ifmissingornull.1.query.sqlpp
index 32f040f..22a8acd 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifmissingornull/ifmissingornull.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifmissingornull/ifmissingornull.1.query.sqlpp
@@ -32,5 +32,12 @@
             case when get_year(current_datetime()) > 0 then missing else false end,
             case when get_year(current_datetime()) > 0 then null else false end,
             case when get_year(current_datetime()) > 0 then true else missing end
+        ),
+   "j": (
+            let v = if_missing_or_null(
+                case when get_year(current_datetime()) > 0 then missing else false end,
+                { "c": [ 2 ] }
+            )
+            select v as b
         )
 };
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifnull/ifnull.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifnull/ifnull.1.query.sqlpp
index c0683bd..0121cd8 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifnull/ifnull.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifnull/ifnull.1.query.sqlpp
@@ -30,5 +30,12 @@
             null,
             case when get_year(current_datetime()) > 0 then null else false end,
             case when get_year(current_datetime()) > 0 then true else missing end
+        ),
+   "j": (
+            let v = if_null(
+                case when get_year(current_datetime()) > 0 then null else false end,
+                { "c": [ 2 ] }
+            )
+            select v as b
         )
 };
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/string/string-concat2/string-concat2.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/string/string-concat2/string-concat2.1.query.sqlpp
new file mode 100644
index 0000000..3e48631
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/string/string-concat2/string-concat2.1.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+{
+  't1': string_concat([null,'aa']),
+  't2': string_concat(['aa',null]),
+  't3': string_concat([null,missing,'aa']),
+  't4': string_concat(['aa',null,missing])
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/string/string-concat2/string-concat2.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/string/string-concat2/string-concat2.2.query.sqlpp
new file mode 100644
index 0000000..12208a7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/string/string-concat2/string-concat2.2.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+{
+  't1': null || 'aa',
+  't2': 'aa' || null,
+  't3': null || missing || 'aa',
+  't4': 'aa' || null || missing
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/binary/concat2/concat2.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/binary/concat2/concat2.1.adm
new file mode 100644
index 0000000..770f2cd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/binary/concat2/concat2.1.adm
@@ -0,0 +1 @@
+{ "t1": null, "t2": null }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/index-selection/intersection-with-between/intersection-with-between.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/index-selection/intersection-with-between/intersection-with-between.4.adm
new file mode 100644
index 0000000..ad86590
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/index-selection/intersection-with-between/intersection-with-between.4.adm
@@ -0,0 +1,7 @@
+{ "o_custkey": 40, "o_orderkey": 323, "o_orderstatus": "F" }
+{ "o_custkey": 40, "o_orderkey": 3653, "o_orderstatus": "F" }
+{ "o_custkey": 40, "o_orderkey": 4934, "o_orderstatus": "O" }
+{ "o_custkey": 43, "o_orderkey": 258, "o_orderstatus": "F" }
+{ "o_custkey": 43, "o_orderkey": 2596, "o_orderstatus": "O" }
+{ "o_custkey": 43, "o_orderkey": 3687, "o_orderstatus": "F" }
+{ "o_custkey": 43, "o_orderkey": 5378, "o_orderstatus": "F" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.8.adm
index 7fa0bce..131b860 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.8.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.8.adm
@@ -20,7 +20,7 @@
                   -- ASSIGN  |PARTITIONED|
                     exchange
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      data-scan []<-[$$22, $$t] <- test.TweetMessages condition (and(le(0, $$t.getField("user").getField("friends_count")), le($$t.getField("user").getField("friends_count"), 150))) limit 2
+                      data-scan []<-[$$22, $$t] <- test.TweetMessages condition (and(ge($$t.getField("user").getField("friends_count"), 0), le($$t.getField("user").getField("friends_count"), 150))) limit 2
                       -- DATASOURCE_SCAN  |PARTITIONED|
                         exchange
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifmissing/ifmissing.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifmissing/ifmissing.1.adm
index 0a2275f..14620a1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifmissing/ifmissing.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifmissing/ifmissing.1.adm
@@ -1 +1 @@
-{ "a": true, "b": true, "c": true, "d": true, "e": true, "f": true, "g": true, "i": true }
\ No newline at end of file
+{ "a": true, "b": true, "c": true, "d": true, "e": true, "f": true, "g": true, "i": true, "j": [ { "b": { "c": [ 2 ] } } ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifmissingornull/ifmissingornull.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifmissingornull/ifmissingornull.1.adm
index 633c503..eff2651 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifmissingornull/ifmissingornull.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifmissingornull/ifmissingornull.1.adm
@@ -1 +1 @@
-{ "a": true, "b": true, "c": true, "d": true, "e": true, "f": true, "g": true, "h": true, "i": true }
\ No newline at end of file
+{ "a": true, "b": true, "c": true, "d": true, "e": true, "f": true, "g": true, "h": true, "i": true, "j": [ { "b": { "c": [ 2 ] } } ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifnull/ifnull.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifnull/ifnull.1.adm
index 633c503..eff2651 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifnull/ifnull.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifnull/ifnull.1.adm
@@ -1 +1 @@
-{ "a": true, "b": true, "c": true, "d": true, "e": true, "f": true, "g": true, "h": true, "i": true }
\ No newline at end of file
+{ "a": true, "b": true, "c": true, "d": true, "e": true, "f": true, "g": true, "h": true, "i": true, "j": [ { "b": { "c": [ 2 ] } } ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/string/string-concat2/string-concat2.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/string/string-concat2/string-concat2.1.adm
new file mode 100644
index 0000000..770f2cd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/string/string-concat2/string-concat2.1.adm
@@ -0,0 +1 @@
+{ "t1": null, "t2": null }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/string/string-concat2/string-concat2.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/string/string-concat2/string-concat2.2.adm
new file mode 100644
index 0000000..770f2cd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/string/string-concat2/string-concat2.2.adm
@@ -0,0 +1 @@
+{ "t1": null, "t2": null }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 0425b79..b5fc053 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -92,6 +92,14 @@
         <expected-error>Duplicate alias definitions: s1</expected-error>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="flwor">
+      <compilation-unit name="query-ASTERIXDB-2446">
+        <output-dir compare="Text">query-ASTERIXDB-883</output-dir>
+        <expected-error>ASX0013: Duplicate field name "a"</expected-error>
+        <expected-error>ASX0013: Duplicate field name "c"</expected-error>
+        <expected-error>ASX0013: Duplicate field name "e"</expected-error>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="explain">
     <test-case FilePath="explain">
@@ -3367,6 +3375,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="index-selection">
+      <compilation-unit name="intersection-with-between">
+        <output-dir compare="Text">intersection-with-between</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="index-selection">
       <compilation-unit name="inverted-index-ngram-contains">
         <output-dir compare="Text">inverted-index-ngram-contains</output-dir>
       </compilation-unit>
@@ -6611,6 +6624,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="string">
+      <compilation-unit name="string-concat2">
+        <output-dir compare="Text">string-concat2</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="string">
       <compilation-unit name="string-equal1">
         <output-dir compare="Text">string-equal1</output-dir>
       </compilation-unit>
@@ -9829,6 +9847,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="binary">
+      <compilation-unit name="concat2">
+        <output-dir compare="Text">concat2</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="binary">
       <compilation-unit name="subbinary">
         <output-dir compare="Text">subbinary</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 4441c6e..d18b6ab 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -40,6 +40,15 @@
     IIndex getIndex(int datasetId, long indexId) throws HyracksDataException;
 
     /**
+     * Indicates if the dataset with id {@code datasetId} is currently registered
+     * with this {@link IDatasetLifecycleManager}
+     *
+     * @param datasetId
+     * @return true if the dataset is currently registered. Otherwise false.
+     */
+    boolean isRegistered(int datasetId);
+
+    /**
      * Flushes all open datasets synchronously.
      *
      * @throws HyracksDataException
@@ -76,18 +85,20 @@
      *
      * @param datasetId
      * @param partition
+     * @param path
      * @return
      */
-    PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition);
+    PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String path);
 
     /**
      * creates (if necessary) and returns the component Id generator of a dataset.
      *
      * @param datasetId
      * @param partition
+     * @param path
      * @return
      */
-    ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition);
+    ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path);
 
     /**
      * creates (if necessary) and returns the dataset virtual buffer caches.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
index 6476e68..811da78 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
@@ -53,7 +53,7 @@
         IDatasetLifecycleManager dslcManager =
                 ((INcApplicationContext) serviceCtx.getApplicationContext()).getDatasetLifecycleManager();
         int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
-        return dslcManager.getComponentIdGenerator(datasetId, partition);
+        return dslcManager.getComponentIdGenerator(datasetId, partition, resource.getPath());
     }
 
     @Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 50a4bef..1dff69d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.context;
 
 import static org.apache.asterix.common.metadata.MetadataIndexImmutableProperties.METADATA_DATASETS_PARTITIONS;
+import static org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId.MIN_VALID_COMPONENT_ID;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -37,7 +38,9 @@
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
@@ -322,32 +325,36 @@
     }
 
     @Override
-    public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition) {
+    public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String path) {
         DatasetResource dataset = datasets.get(datasetId);
         PrimaryIndexOperationTracker opTracker = dataset.getOpTracker(partition);
         if (opTracker == null) {
-            populateOpTrackerAndIdGenerator(dataset, partition);
+            populateOpTrackerAndIdGenerator(dataset, partition, path);
             opTracker = dataset.getOpTracker(partition);
         }
         return opTracker;
     }
 
     @Override
-    public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition) {
+    public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path) {
         DatasetResource dataset = datasets.get(datasetId);
-        if (dataset == null) {
-            return null;
-        }
         ILSMComponentIdGenerator generator = dataset.getComponentIdGenerator(partition);
         if (generator == null) {
-            populateOpTrackerAndIdGenerator(dataset, partition);
+            populateOpTrackerAndIdGenerator(dataset, partition, path);
             generator = dataset.getComponentIdGenerator(partition);
         }
         return generator;
     }
 
-    private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int partition) {
-        ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(storageProperties.getMemoryComponentsNum());
+    @Override
+    public synchronized boolean isRegistered(int datasetId) {
+        return datasets.containsKey(datasetId);
+    }
+
+    private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int partition, String path) {
+        final long lastValidId = getDatasetLastValidComponentId(path);
+        ILSMComponentIdGenerator idGenerator =
+                new LSMComponentIdGenerator(storageProperties.getMemoryComponentsNum(), lastValidId);
         PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(dataset.getDatasetID(), partition,
                 logManager, dataset.getDatasetInfo(), idGenerator);
         dataset.setPrimaryIndexOperationTracker(partition, opTracker);
@@ -600,4 +607,18 @@
             indexInfo.setOpen(false);
         }
     }
+
+    private long getDatasetLastValidComponentId(String indexPath) {
+        try {
+            final ResourceReference indexRef = ResourceReference.ofIndex(indexPath);
+            final ResourceReference primaryIndexRef = indexRef.getDatasetReference();
+            final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(primaryIndexRef);
+            if (indexCheckpointManager.getCheckpointCount() > 0) {
+                return Math.max(indexCheckpointManager.getLatest().getLastComponentId(), MIN_VALID_COMPONENT_ID);
+            }
+            return MIN_VALID_COMPONENT_ID;
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index ea53d68..0aa46a8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -44,9 +44,9 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
 import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
 import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
@@ -73,12 +73,12 @@
     private int pendingFlushes = 0;
     private Deque<ILSMComponentId> componentIds = new ArrayDeque<>();
 
-    public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentId nextComponentId,
+    public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentId componentId,
             IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
         this.dsInfo = dsInfo;
         this.lsmIndex = lsmIndex;
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
-        componentIds.add(nextComponentId);
+        componentIds.add(componentId);
     }
 
     @Override
@@ -132,8 +132,8 @@
                 operation.getIOOpertionType() == LSMIOOperationType.FLUSH ? (Long) map.get(KEY_FLUSH_LOG_LSN) : 0L;
         final LSMComponentId id = (LSMComponentId) map.get(KEY_FLUSHED_COMPONENT_ID);
         final ResourceReference ref = ResourceReference.of(target.getAbsolutePath());
-        final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(ref.getName());
-        indexCheckpointManagerProvider.get(ref).flushed(componentEndTime, lsn, id.getMaxId());
+        final long componentSequence = IndexComponentFileReference.of(ref.getName()).getSequenceEnd();
+        indexCheckpointManagerProvider.get(ref).flushed(componentSequence, lsn, id.getMaxId());
     }
 
     private void deleteComponentsFromCheckpoint(ILSMIOOperation operation) throws HyracksDataException {
@@ -275,6 +275,6 @@
 
     @Override
     public void allocated(ILSMMemoryComponent component) throws HyracksDataException {
-        // No Op
+        // no op
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index 2c0872c..dd9ede5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -27,33 +27,33 @@
     /**
      * Initializes the first checkpoint of an index with low watermark {@code lsn}
      *
-     * @param componentTimestamp
+     * @param validComponentSequence
      * @param lsn
      * @throws HyracksDataException
      */
-    void init(String componentTimestamp, long lsn) throws HyracksDataException;
+    void init(long validComponentSequence, long lsn) throws HyracksDataException;
 
     /**
      * Called when a new LSM disk component is flushed. When called, the index checkpoint is updated
-     * with the latest valid {@code componentTimestamp} and low watermark {@code lsn}
+     * with the latest valid {@code componentSequence} and low watermark {@code lsn}
      *
-     * @param componentTimestamp
+     * @param componentSequence
      * @param lsn
      * @throws HyracksDataException
      */
-    void flushed(String componentTimestamp, long lsn, long componentId) throws HyracksDataException;
+    void flushed(long componentSequence, long lsn, long componentId) throws HyracksDataException;
 
     /**
      * Called when a new LSM disk component is replicated from master. When called, the index checkpoint is updated
-     * with the latest valid {@code componentTimestamp} and the local lsn mapping of {@code masterLsn} is set as the
+     * with the latest valid {@code componentSequence} and the local lsn mapping of {@code masterLsn} is set as the
      * new low watermark.
      *
-     * @param componentTimestamp
+     * @param componentSequence
      * @param masterLsn
      * @param componentId
      * @throws HyracksDataException
      */
-    void replicated(String componentTimestamp, long masterLsn, long componentId) throws HyracksDataException;
+    void replicated(long componentSequence, long masterLsn, long componentId) throws HyracksDataException;
 
     /**
      * Called when a flush log is received and replicated from master. The mapping between
@@ -88,12 +88,12 @@
     void delete();
 
     /**
-     * Gets the index last valid component timestamp if the index has any components. Otherwise {@link Optional#empty()}
+     * Gets the index last valid component sequence.
      *
-     * @return the index last valid component timestamp
+     * @return the index last valid component sequence
      * @throws HyracksDataException
      */
-    Optional<String> getValidComponentTimestamp() throws HyracksDataException;
+    long getValidComponentSequence() throws HyracksDataException;
 
     /**
      * Gets the number of valid checkpoints the index has.
@@ -110,12 +110,12 @@
     IndexCheckpoint getLatest() throws HyracksDataException;
 
     /**
-     * Advance the last valid component timestamp. Used for replicated bulkloaded components
+     * Advance the last valid component sequence. Used for replicated bulkloaded components
      *
-     * @param timeStamp
+     * @param componentSequence
      * @throws HyracksDataException
      */
-    void advanceValidComponentTimestamp(String timeStamp) throws HyracksDataException;
+    void advanceValidComponentSequence(long componentSequence) throws HyracksDataException;
 
     /**
      * Set the last component id. Used during recovery or after component delete
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index 73d3122..f84167e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -34,22 +34,22 @@
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final long INITIAL_CHECKPOINT_ID = 0;
     private long id;
-    private String validComponentTimestamp;
+    private long validComponentSequence;
     private long lowWatermark;
     private long lastComponentId;
     private Map<Long, Long> masterNodeFlushMap;
 
-    public static IndexCheckpoint first(String lastComponentTimestamp, long lowWatermark) {
+    public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark) {
         IndexCheckpoint firstCheckpoint = new IndexCheckpoint();
         firstCheckpoint.id = INITIAL_CHECKPOINT_ID;
         firstCheckpoint.lowWatermark = lowWatermark;
-        firstCheckpoint.validComponentTimestamp = lastComponentTimestamp;
+        firstCheckpoint.validComponentSequence = lastComponentSequence;
         firstCheckpoint.lastComponentId = LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId();
         firstCheckpoint.masterNodeFlushMap = new HashMap<>();
         return firstCheckpoint;
     }
 
-    public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, String validComponentTimestamp,
+    public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, long validComponentSequence,
             long lastComponentId) {
         if (lowWatermark < latest.getLowWatermark()) {
             throw new IllegalStateException("Low watermark should always be increasing");
@@ -58,7 +58,7 @@
         next.id = latest.getId() + 1;
         next.lowWatermark = lowWatermark;
         next.lastComponentId = lastComponentId;
-        next.validComponentTimestamp = validComponentTimestamp;
+        next.validComponentSequence = validComponentSequence;
         next.masterNodeFlushMap = latest.getMasterNodeFlushMap();
         // remove any lsn from the map that wont be used anymore
         next.masterNodeFlushMap.values().removeIf(lsn -> lsn <= lowWatermark);
@@ -69,8 +69,8 @@
     private IndexCheckpoint() {
     }
 
-    public String getValidComponentTimestamp() {
-        return validComponentTimestamp;
+    public long getValidComponentSequence() {
+        return validComponentSequence;
     }
 
     public long getLowWatermark() {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index 8bd848d..a24bf72 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -38,6 +38,10 @@
     protected ResourceReference() {
     }
 
+    public static ResourceReference ofIndex(String indexPath) {
+        return of(new File(indexPath, StorageConstants.METADATA_FILE_NAME).toString());
+    }
+
     public static ResourceReference of(String localResourcePath) {
         ResourceReference lrr = new ResourceReference();
         parse(lrr, localResourcePath);
@@ -72,6 +76,11 @@
         return Paths.get(root, partition, dataverse, dataset, rebalance, index);
     }
 
+    public ResourceReference getDatasetReference() {
+        return ResourceReference
+                .ofIndex(Paths.get(root, partition, dataverse, dataset, rebalance, dataset).toFile().getPath());
+    }
+
     public Path getFileRelativePath() {
         return Paths.get(root, partition, dataverse, dataset, rebalance, index, name);
     }
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index f0dfd1a..7158b95 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -43,8 +43,8 @@
 
 # Data errors
 6 = Invalid format for %1$s in %2$s
-7 = Overflow happend in %1$s
-8 = Underflow happend in %1$s
+7 = Overflow in %1$s
+8 = Underflow in %1$s
 9 = Injected failure in %1$s
 10 = Invalid value: function %1$s expects its %2$s input parameter to be a non-negative value, but gets %3$s
 11 = Index out of bound in %1$s: %2$s
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
index 29a2aa0..db0911b 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
@@ -19,9 +19,8 @@
 
 package org.apache.asterix.test.ioopcallbacks;
 
-import java.text.Format;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import static org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId.MIN_VALID_COMPONENT_ID;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -38,7 +37,6 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
 import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
@@ -64,13 +62,11 @@
      * 7. destroy
      */
 
-    private static final Format FORMATTER =
-            new SimpleDateFormat(AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT);
+    private static long COMPONENT_SEQUENCE = 0;
 
     private static String getComponentFileName() {
-        Date date = new Date();
-        String ts = FORMATTER.format(date);
-        return ts + '_' + ts;
+        final String sequence = String.valueOf(COMPONENT_SEQUENCE++);
+        return sequence + '_' + sequence;
     }
 
     @Test
@@ -82,7 +78,7 @@
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
         Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
         DatasetInfo dsInfo = new DatasetInfo(101, null);
-        LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents);
+        LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
         LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
                 mockIndexCheckpointManagerProvider());
         //Flush first
@@ -141,7 +137,7 @@
     public void testAllocateComponentId() throws HyracksDataException {
         int numMemoryComponents = 2;
         DatasetInfo dsInfo = new DatasetInfo(101, null);
-        ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents);
+        ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
         ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
@@ -153,9 +149,6 @@
         idGenerator.refresh();
         long flushLsn = 1L;
         ILSMComponentId nextComponentId = idGenerator.getId();
-        Map<String, Object> flushMap = new HashMap<>();
-        flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
-        flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
         callback.allocated(mockComponent);
         callback.recycled(mockComponent);
         checkMemoryComponent(initialId, mockComponent);
@@ -165,7 +158,7 @@
     public void testRecycleComponentId() throws HyracksDataException {
         int numMemoryComponents = 2;
         DatasetInfo dsInfo = new DatasetInfo(101, null);
-        ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents);
+        ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
         ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
@@ -223,7 +216,8 @@
         IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
                 Mockito.mock(IIndexCheckpointManagerProvider.class);
         IIndexCheckpointManager indexCheckpointManager = Mockito.mock(IIndexCheckpointManager.class);
-        Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.any(), Mockito.anyLong(), Mockito.anyLong());
+        Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.anyLong(), Mockito.anyLong(),
+                Mockito.anyLong());
         Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any());
         return indexCheckpointManagerProvider;
     }
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/OperatorExpressionVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/OperatorExpressionVisitor.java
index 372f0fa..23b4d60 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/OperatorExpressionVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/OperatorExpressionVisitor.java
@@ -136,13 +136,13 @@
         Expression left = operatorExpr.getExprList().get(1);
         Expression right = operatorExpr.getExprList().get(2);
 
-        // Creates the expression left <= target.
-        Expression leftComparison =
-                createLessThanExpression(left, target, operatorExpr.getHints(), operatorExpr.getSourceLocation());
+        // Creates the expression target >= left.
+        Expression leftComparison = createOperatorExpression(OperatorType.GE, target, left, operatorExpr.getHints(),
+                operatorExpr.getSourceLocation());
         // Creates the expression target <= right.
         Expression targetCopy = (Expression) SqlppRewriteUtil.deepCopy(target);
-        Expression rightComparison =
-                createLessThanExpression(targetCopy, right, operatorExpr.getHints(), operatorExpr.getSourceLocation());
+        Expression rightComparison = createOperatorExpression(OperatorType.LE, targetCopy, right,
+                operatorExpr.getHints(), operatorExpr.getSourceLocation());
         OperatorExpr andExpr = new OperatorExpr();
         andExpr.addOperand(leftComparison);
         andExpr.addOperand(rightComparison);
@@ -158,12 +158,12 @@
         }
     }
 
-    private Expression createLessThanExpression(Expression lhs, Expression rhs, List<IExpressionAnnotation> hints,
-            SourceLocation sourceLoc) {
+    private Expression createOperatorExpression(OperatorType opType, Expression lhs, Expression rhs,
+            List<IExpressionAnnotation> hints, SourceLocation sourceLoc) {
         OperatorExpr comparison = new OperatorExpr();
         comparison.addOperand(lhs);
         comparison.addOperand(rhs);
-        comparison.addOperator(OperatorType.LE);
+        comparison.addOperator(opType);
         comparison.setSourceLocation(sourceLoc);
         if (hints != null) {
             for (IExpressionAnnotation hint : hints) {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/TypeResolverUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/TypeResolverUtil.java
index 561df5e..b7bd8ca 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/TypeResolverUtil.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/TypeResolverUtil.java
@@ -106,7 +106,7 @@
         // Gets the actual types for UNIONs and mark unknownable to be true.
         if (leftTypeTag == ATypeTag.UNION || rightTypeTag == ATypeTag.UNION) {
             leftType = TypeComputeUtils.getActualType(leftType);
-            rightType = TypeComputeUtils.getActualType(leftType);
+            rightType = TypeComputeUtils.getActualType(rightType);
             leftTypeTag = leftType.getTypeTag();
             rightTypeTag = rightType.getTypeTag();
             unknownable = true;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index d8eeefc..49db062 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -68,6 +68,7 @@
 import org.apache.asterix.om.typecomputer.impl.CollectionMemberResultType;
 import org.apache.asterix.om.typecomputer.impl.CollectionToSequenceTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.ConcatNonNullTypeComputer;
+import org.apache.asterix.om.typecomputer.impl.ConcatTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.DoubleIfTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.FieldAccessByIndexResultType;
 import org.apache.asterix.om.typecomputer.impl.FieldAccessByNameResultType;
@@ -1240,7 +1241,7 @@
         addFunction(BINARY_LENGTH, UnaryBinaryInt64TypeComputer.INSTANCE, true);
         addFunction(PARSE_BINARY, ABinaryTypeComputer.INSTANCE, true);
         addFunction(PRINT_BINARY, AStringTypeComputer.INSTANCE, true);
-        addFunction(BINARY_CONCAT, ABinaryTypeComputer.INSTANCE, true);
+        addFunction(BINARY_CONCAT, ConcatTypeComputer.INSTANCE_BINARY, true);
         addFunction(SUBBINARY_FROM, ABinaryTypeComputer.INSTANCE, true);
         addFunction(SUBBINARY_FROM_TO, ABinaryTypeComputer.INSTANCE, true);
         addFunction(FIND_BINARY, AInt64TypeComputer.INSTANCE, true);
@@ -1251,7 +1252,7 @@
         addFunction(STRING_CONTAINS, ABooleanTypeComputer.INSTANCE, true);
         addFunction(STRING_TO_CODEPOINT, StringToInt64ListTypeComputer.INSTANCE, true);
         addFunction(CODEPOINT_TO_STRING, AStringTypeComputer.INSTANCE, true);
-        addFunction(STRING_CONCAT, AStringTypeComputer.INSTANCE, true);
+        addFunction(STRING_CONCAT, ConcatTypeComputer.INSTANCE_STRING, true);
         addFunction(SUBSTRING2, StringIntToStringTypeComputer.INSTANCE_NULLABLE, true);
         addFunction(STRING_LENGTH, UnaryStringInt64TypeComputer.INSTANCE, true);
         addFunction(STRING_LOWERCASE, StringStringTypeComputer.INSTANCE, true);
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ClosedRecordConstructorResultType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ClosedRecordConstructorResultType.java
index d5e576e..3500435 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ClosedRecordConstructorResultType.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ClosedRecordConstructorResultType.java
@@ -21,6 +21,8 @@
 
 import java.util.Iterator;
 
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.om.exceptions.InvalidExpressionException;
 import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
 import org.apache.asterix.om.typecomputer.base.TypeCastUtils;
@@ -68,12 +70,18 @@
                 AUnionType unionType = (AUnionType) e2Type;
                 e2Type = AUnionType.createUnknownableType(unionType.getActualType());
             }
-            fieldTypes[i] = e2Type;
-            fieldNames[i] = ConstantExpressionUtil.getStringConstant(e1);
-            if (fieldNames[i] == null) {
+            String fieldName = ConstantExpressionUtil.getStringConstant(e1);
+            if (fieldName == null) {
                 throw new InvalidExpressionException(f.getSourceLocation(), funcName, 2 * i, e1,
                         LogicalExpressionTag.CONSTANT);
             }
+            for (int j = 0; j < i; j++) {
+                if (fieldName.equals(fieldNames[j])) {
+                    throw new CompilationException(ErrorCode.DUPLICATE_FIELD_NAME, f.getSourceLocation(), fieldName);
+                }
+            }
+            fieldTypes[i] = e2Type;
+            fieldNames[i] = fieldName;
             i++;
         }
         return new ARecordType(null, fieldNames, fieldTypes, false);
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ConcatTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ConcatTypeComputer.java
new file mode 100644
index 0000000..db59877
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ConcatTypeComputer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.typecomputer.impl;
+
+import org.apache.asterix.om.typecomputer.base.AbstractResultTypeComputer;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+
+public class ConcatTypeComputer extends AbstractResultTypeComputer {
+
+    public static final ConcatTypeComputer INSTANCE_STRING = new ConcatTypeComputer(BuiltinType.ASTRING);
+
+    public static final ConcatTypeComputer INSTANCE_BINARY = new ConcatTypeComputer(BuiltinType.ABINARY);
+
+    private final IAType resultType;
+
+    private ConcatTypeComputer(IAType resultType) {
+        this.resultType = resultType;
+    }
+
+    @Override
+    protected IAType getResultType(ILogicalExpression expr, IAType... strippedInputTypes) throws AlgebricksException {
+        IAType argType = strippedInputTypes[0];
+        IAType outputType = resultType;
+        if (!argType.getTypeTag().isListType() || isUnknownable(((AbstractCollectionType) argType).getItemType())) {
+            outputType = AUnionType.createUnknownableType(outputType);
+        }
+        return outputType;
+    }
+
+    private boolean isUnknownable(IAType type) {
+        switch (type.getTypeTag()) {
+            case ANY:
+            case MISSING:
+            case NULL:
+                return true;
+            case UNION:
+                return ((AUnionType) type).isUnknownableType();
+            default:
+                return false;
+        }
+    }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java
index 1442dfe..6deb17c 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java
@@ -25,6 +25,8 @@
 import java.util.List;
 import java.util.Set;
 
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
 import org.apache.asterix.om.typecomputer.base.TypeCastUtils;
 import org.apache.asterix.om.types.ARecordType;
@@ -72,6 +74,9 @@
             IAType t2 = (IAType) env.getType(e2);
             String fieldName = ConstantExpressionUtil.getStringConstant(e1);
             if (fieldName != null && t2 != null && TypeHelper.isClosed(t2)) {
+                if (namesList.contains(fieldName)) {
+                    throw new CompilationException(ErrorCode.DUPLICATE_FIELD_NAME, f.getSourceLocation(), fieldName);
+                }
                 namesList.add(fieldName);
                 if (t2.getTypeTag() == ATypeTag.UNION) {
                     AUnionType unionType = (AUnionType) t2;
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index d4d601c..448613b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -36,6 +36,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 import org.apache.hyracks.storage.common.LocalResource;
 
 /**
@@ -62,20 +63,19 @@
             DatasetResourceReference ref = DatasetResourceReference.of(ls);
             final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(ref);
             indexCheckpointManager.delete();
-            // Get most recent timestamp of existing files to avoid deletion
+            // Get most recent sequence of existing files to avoid deletion
             Path indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
             String[] files = indexPath.toFile().list(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
             if (files == null) {
                 throw HyracksDataException
                         .create(new IOException(indexPath + " is not a directory or an IO Error occurred"));
             }
-            String mostRecentTimestamp = null;
+            long maxComponentSequence = Long.MIN_VALUE;
             for (String file : files) {
-                String nextTimeStamp = AbstractLSMIndexFileManager.getComponentEndTime(file);
-                mostRecentTimestamp = mostRecentTimestamp == null || nextTimeStamp.compareTo(mostRecentTimestamp) > 0
-                        ? nextTimeStamp : mostRecentTimestamp;
+                maxComponentSequence =
+                        Math.max(maxComponentSequence, IndexComponentFileReference.of(file).getSequenceEnd());
             }
-            indexCheckpointManager.init(mostRecentTimestamp, currentLSN);
+            indexCheckpointManager.init(maxComponentSequence, currentLSN);
         }
         ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
     }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
index d5dc51d..55dd5d4 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
@@ -63,8 +63,8 @@
         final IIOManager ioManager = appCtx.getIoManager();
         final FileReference localPath = ioManager.resolve(componentFile);
         final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath());
-        final String componentId = PersistentLocalResourceRepository.getComponentId(componentFile);
-        return Paths.get(resourceDir.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentId);
+        final String componentSequence = PersistentLocalResourceRepository.getComponentSequence(componentFile);
+        return Paths.get(resourceDir.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
     }
 
     @Override
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
index a4f9b43..b360a09 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -37,6 +37,7 @@
 import org.apache.asterix.replication.sync.IndexSynchronizer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 
 /**
  * A task to mark a replicated LSM component as valid
@@ -57,7 +58,7 @@
     public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
         try {
             if (masterLsn == IndexSynchronizer.BULKLOAD_LSN) {
-                updateBulkLoadedLastComponentTimestamp(appCtx);
+                updateBulkLoadedLastComponentSequence(appCtx);
             } else if (masterLsn != IndexSynchronizer.MERGE_LSN) {
                 ensureComponentLsnFlushed(appCtx);
             }
@@ -70,13 +71,12 @@
         }
     }
 
-    private void updateBulkLoadedLastComponentTimestamp(INcApplicationContext appCtx) throws HyracksDataException {
+    private void updateBulkLoadedLastComponentSequence(INcApplicationContext appCtx) throws HyracksDataException {
         final ResourceReference indexRef = ResourceReference.of(file);
         final IIndexCheckpointManagerProvider checkpointManagerProvider = appCtx.getIndexCheckpointManagerProvider();
         final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
-        final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName());
-        indexCheckpointManager.advanceValidComponentTimestamp(componentEndTime);
-
+        final long componentSequence = IndexComponentFileReference.of(indexRef.getName()).getSequenceEnd();
+        indexCheckpointManager.advanceValidComponentSequence(componentSequence);
     }
 
     private void ensureComponentLsnFlushed(INcApplicationContext appCtx)
@@ -95,8 +95,8 @@
                 indexCheckpointManager.wait(replicationTimeOut);
                 replicationTimeOut -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
             }
-            final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName());
-            indexCheckpointManager.replicated(componentEndTime, masterLsn, lastComponentId);
+            final long componentSequence = IndexComponentFileReference.of(indexRef.getName()).getSequenceEnd();
+            indexCheckpointManager.replicated(componentSequence, masterLsn, lastComponentId);
         }
     }
 
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index 20663d1..f53d448 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -98,7 +98,7 @@
         final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
         final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
         indexCheckpointManager.delete();
-        indexCheckpointManager.init(null, currentLSN);
+        indexCheckpointManager.init(Long.MIN_VALUE, currentLSN);
         LOGGER.info(() -> "Checkpoint index: " + indexRef);
     }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java
index cb213ae..a2872bf 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java
@@ -94,6 +94,7 @@
                             listAccessor.reset(listBytes, listOffset);
                             // calculate length first
                             int utf8Len = 0;
+                            boolean itemIsNull = false;
                             for (int i = 0; i < listAccessor.size(); i++) {
                                 int itemOffset = listAccessor.getItemOffset(i);
                                 ATypeTag itemType = listAccessor.getItemType(itemOffset);
@@ -104,9 +105,8 @@
                                 }
                                 if (itemType != ATypeTag.STRING) {
                                     if (itemType == ATypeTag.NULL) {
-                                        nullSerde.serialize(ANull.NULL, out);
-                                        result.set(resultStorage);
-                                        return;
+                                        itemIsNull = true;
+                                        continue;
                                     }
                                     if (itemType == ATypeTag.MISSING) {
                                         missingSerde.serialize(AMissing.MISSING, out);
@@ -118,6 +118,11 @@
                                 }
                                 utf8Len += UTF8StringUtil.getUTFLength(listBytes, itemOffset);
                             }
+                            if (itemIsNull) {
+                                nullSerde.serialize(ANull.NULL, out);
+                                result.set(resultStorage);
+                                return;
+                            }
                             out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
                             int cbytes = UTF8StringUtil.encodeUTF8Length(utf8Len, tempLengthArray, 0);
                             out.write(tempLengthArray, 0, cbytes);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java
index 59f3396..4eaa1d1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java
@@ -94,11 +94,17 @@
                         try {
                             listAccessor.reset(data, offset);
                             int concatLength = 0;
+                            boolean itemIsNull = false;
                             for (int i = 0; i < listAccessor.size(); i++) {
                                 int itemOffset = listAccessor.getItemOffset(i);
                                 ATypeTag itemType = listAccessor.getItemType(itemOffset);
                                 if (itemType != ATypeTag.BINARY) {
-                                    if (serializeUnknownIfAnyUnknown(itemType)) {
+                                    if (itemType == ATypeTag.NULL) {
+                                        itemIsNull = true;
+                                        continue;
+                                    }
+                                    if (itemType == ATypeTag.MISSING) {
+                                        missingSerde.serialize(AMissing.MISSING, dataOutput);
                                         result.set(resultStorage);
                                         return;
                                     }
@@ -107,6 +113,11 @@
                                 }
                                 concatLength += ByteArrayPointable.getContentLength(data, itemOffset);
                             }
+                            if (itemIsNull) {
+                                nullSerde.serialize(ANull.NULL, dataOutput);
+                                result.set(resultStorage);
+                                return;
+                            }
                             dataOutput.writeByte(ATypeTag.SERIALIZED_BINARY_TYPE_TAG);
                             int metaLen = VarLenIntEncoderDecoder.encode(concatLength, metaBuffer, 0);
                             dataOutput.write(metaBuffer, 0, metaLen);
@@ -122,20 +133,6 @@
                         }
                         result.set(resultStorage);
                     }
-
-                    private boolean serializeUnknownIfAnyUnknown(ATypeTag... tags) throws HyracksDataException {
-                        for (ATypeTag typeTag : tags) {
-                            if (typeTag == ATypeTag.NULL) {
-                                nullSerde.serialize(ANull.NULL, dataOutput);
-                                return true;
-                            }
-                            if (typeTag == ATypeTag.MISSING) {
-                                missingSerde.serialize(AMissing.MISSING, dataOutput);
-                                return true;
-                            }
-                        }
-                        return false;
-                    }
                 };
             }
         };
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
index 8ea1fa7..b123a5e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
@@ -26,12 +26,15 @@
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
 import org.apache.hyracks.control.cc.scheduler.IResourceManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 // To avoid the computation cost for checking the capacity constraint for each node,
 // currently the admit/allocation decisions are based on the aggregated resource information.
 // TODO(buyingyi): investigate partition-aware resource control.
 public class JobCapacityController implements IJobCapacityController {
 
+    private static final Logger LOGGER = LogManager.getLogger();
     private final IResourceManager resourceManager;
 
     public JobCapacityController(IResourceManager resourceManager) {
@@ -71,6 +74,16 @@
         int aggregatedNumCores = currentCapacity.getAggregatedCores();
         currentCapacity.setAggregatedMemoryByteSize(aggregatedMemoryByteSize + reqAggregatedMemoryByteSize);
         currentCapacity.setAggregatedCores(aggregatedNumCores + reqAggregatedNumCores);
+        ensureMaxCapacity();
     }
 
+    private void ensureMaxCapacity() {
+        final IClusterCapacity currentCapacity = resourceManager.getCurrentCapacity();
+        final IReadOnlyClusterCapacity maximumCapacity = resourceManager.getMaximumCapacity();
+        if (currentCapacity.getAggregatedCores() > maximumCapacity.getAggregatedCores()
+                || currentCapacity.getAggregatedMemoryByteSize() > maximumCapacity.getAggregatedMemoryByteSize()) {
+            LOGGER.warn("Current cluster available capacity {} is more than its maximum capacity {}", currentCapacity,
+                    maximumCapacity);
+        }
+    }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
index e353714..ab84211 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
@@ -49,7 +49,7 @@
         IDatasetLifecycleManager dslcManager =
                 ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
         int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
-        return dslcManager.getOperationTracker(datasetId, partition);
+        return dslcManager.getOperationTracker(datasetId, partition, resource.getPath());
     }
 
     @Override
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index f9718c4..c0da095 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -22,7 +22,6 @@
 import static org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME;
 import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
 import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER;
-import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT;
 
 import java.io.File;
 import java.io.FilenameFilter;
@@ -30,12 +29,9 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.text.Format;
 import java.text.ParseException;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -71,6 +67,7 @@
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.util.ExitUtil;
@@ -129,9 +126,6 @@
         }
     };
 
-    private static final ThreadLocal<SimpleDateFormat> THREAD_LOCAL_FORMATTER =
-            ThreadLocal.withInitial(() -> new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT));
-
     // Finals
     private final IIOManager ioManager;
     private final Cache<String, LocalResource> resourceCache;
@@ -202,7 +196,7 @@
             byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
             final Path path = Paths.get(resourceFile.getAbsolutePath());
             Files.write(path, bytes);
-            indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(null, 0);
+            indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(Long.MIN_VALUE, 0);
             deleteResourceFileMask(resourceFile);
         } catch (Exception e) {
             cleanup(resourceFile);
@@ -481,30 +475,18 @@
     }
 
     private void deleteIndexInvalidComponents(File index) throws IOException, ParseException {
-        final Format formatter = THREAD_LOCAL_FORMATTER.get();
         final File[] indexComponentFiles = index.listFiles(COMPONENT_FILES_FILTER);
         if (indexComponentFiles == null) {
             throw new IOException(index + " doesn't exist or an IO error occurred");
         }
-        final Optional<String> validComponentTimestamp = getIndexCheckpointManager(index).getValidComponentTimestamp();
-        if (!validComponentTimestamp.isPresent()) {
-            // index doesn't have any valid component, delete all
-            for (File componentFile : indexComponentFiles) {
+        final long validComponentSequence = getIndexCheckpointManager(index).getValidComponentSequence();
+        for (File componentFile : indexComponentFiles) {
+            // delete any file with start sequence > valid component sequence
+            final long fileStart = IndexComponentFileReference.of(componentFile.getName()).getSequenceStart();
+            if (fileStart > validComponentSequence) {
                 LOGGER.info(() -> "Deleting invalid component file: " + componentFile.getAbsolutePath());
                 Files.delete(componentFile.toPath());
             }
-        } else {
-            final Date validTimestamp = (Date) formatter.parseObject(validComponentTimestamp.get());
-            for (File componentFile : indexComponentFiles) {
-                // delete any file with startTime > validTimestamp
-                final String fileStartTimeStr =
-                        AbstractLSMIndexFileManager.getComponentStartTime(componentFile.getName());
-                final Date fileStartTime = (Date) formatter.parseObject(fileStartTimeStr);
-                if (fileStartTime.after(validTimestamp)) {
-                    LOGGER.info(() -> "Deleting invalid component file: " + componentFile.getAbsolutePath());
-                    Files.delete(componentFile.toPath());
-                }
-            }
         }
     }
 
@@ -545,8 +527,8 @@
                     long fileSize = file.length();
                     totalSize += fileSize;
                     if (isComponentFile(resolvedPath.getFile(), file.getName())) {
-                        String componentId = getComponentId(file.getAbsolutePath());
-                        componentsStats.put(componentId, componentsStats.getOrDefault(componentId, 0L) + fileSize);
+                        String componentSeq = getComponentSequence(file.getAbsolutePath());
+                        componentsStats.put(componentSeq, componentsStats.getOrDefault(componentSeq, 0L) + fileSize);
                     }
                 }
             }
@@ -576,17 +558,16 @@
     }
 
     /**
-     * Gets a component id based on its unique timestamp.
-     * e.g. a component file 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439_b
-     * will return a component id 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439
+     * Gets a component sequence based on its unique timestamp.
+     * e.g. a component file 1_3_b
+     * will return a component sequence 1_3
      *
-     * @param componentFile
-     *            any component file
-     * @return The component id
+     * @param componentFile any component file
+     * @return The component sequence
      */
-    public static String getComponentId(String componentFile) {
+    public static String getComponentSequence(String componentFile) {
         final ResourceReference ref = ResourceReference.of(componentFile);
-        return ref.getName().substring(0, ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER));
+        return IndexComponentFileReference.of(ref.getName()).getSequence();
     }
 
     private static boolean isComponentMask(File mask) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index ff1e47f..962f826 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -97,4 +97,11 @@
      * @throws HyracksDataException
      */
     FileReference createWorkspaceFile(String prefix) throws HyracksDataException;
+
+    /**
+     * Gets the total disk usage in bytes of this {@link IIOManager} io devices handles.
+     *
+     * @return the total disk usage in bytes
+     */
+    long getTotalDiskUsage();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 334fb5c..261e7c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -18,9 +18,11 @@
  */
 package org.apache.hyracks.comm.channels;
 
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.Deque;
+import java.util.Objects;
 
 import org.apache.hyracks.api.comm.IBufferAcceptor;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -75,7 +77,15 @@
                         break;
                     }
                     try {
-                        wait();
+                        InetSocketAddress remoteAddress = ccb.getRemoteAddress();
+                        String nameBefore = Thread.currentThread().getName();
+                        try {
+                            Thread.currentThread()
+                                    .setName(nameBefore + ":SendingTo(" + Objects.toString(remoteAddress) + ")");
+                            wait();
+                        } finally {
+                            Thread.currentThread().setName(nameBefore);
+                        }
                     } catch (InterruptedException e) {
                         Thread.currentThread().interrupt();
                         throw HyracksDataException.create(e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index f7067ff..b2e4a5e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -216,7 +216,7 @@
         webServer.start();
         info = new ClusterControllerInfo(ccId, ccConfig.getClientPublicAddress(), ccConfig.getClientPublicPort(),
                 ccConfig.getConsolePublicPort());
-        timer.schedule(sweeper, 0, ccConfig.getHeartbeatPeriodMillis());
+        timer.schedule(sweeper, 0, ccConfig.getDeadNodeSweepThreshold());
         jobLog.open();
         startApplication();
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 7dc636c..7e1ca61 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -229,8 +229,9 @@
         }
         run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
         run.setEndTime(System.currentTimeMillis());
-        if (activeRunMap.remove(jobId) == null) {
-            LOGGER.warn("Job {} was not found running but is getting archived and capacity released", jobId);
+        if (activeRunMap.remove(jobId) != null) {
+            // non-active jobs have zero capacity
+            releaseJobCapacity(run);
         }
         runMapArchive.put(jobId, run);
         runMapHistory.put(jobId, run.getExceptions());
@@ -247,10 +248,6 @@
             }
         }
 
-        // Releases cluster capacitys occupied by the job.
-        JobSpecification job = run.getJobSpecification();
-        jobCapacityController.release(job);
-
         // Picks the next job to execute.
         pickJobsToRun();
 
@@ -347,4 +344,9 @@
             throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
         }
     }
+
+    private void releaseJobCapacity(JobRun jobRun) {
+        final JobSpecification job = jobRun.getJobSpecification();
+        jobCapacityController.release(job);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index f83df6e..1cb2d05 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -60,6 +60,7 @@
         CONSOLE_PUBLIC_PORT(INTEGER, CONSOLE_LISTEN_PORT),
         HEARTBEAT_PERIOD(LONG, 10000L), // TODO (mblow): add time unit
         HEARTBEAT_MAX_MISSES(INTEGER, 5),
+        DEAD_NODE_SWEEP_THRESHOLD(LONG, HEARTBEAT_PERIOD),
         PROFILE_DUMP_PERIOD(INTEGER, 0),
         JOB_HISTORY_SIZE(INTEGER, 10),
         RESULT_TTL(LONG, 86400000L), // TODO(mblow): add time unit
@@ -154,7 +155,9 @@
                 case HEARTBEAT_PERIOD:
                     return "Sets the time duration between two heartbeats from each node controller in milliseconds";
                 case HEARTBEAT_MAX_MISSES:
-                    return "Sets the maximum number of missed heartbeats before a node is marked as dead";
+                    return "Sets the maximum number of missed heartbeats before a node can be considered dead";
+                case DEAD_NODE_SWEEP_THRESHOLD:
+                    return "Sets the frequency (in milliseconds) to process nodes that can be considered dead";
                 case PROFILE_DUMP_PERIOD:
                     return "Sets the time duration between two profile dumps from each node controller in "
                             + "milliseconds; 0 to disable";
@@ -326,6 +329,14 @@
         configManager.set(Option.HEARTBEAT_MAX_MISSES, heartbeatMaxMisses);
     }
 
+    public long getDeadNodeSweepThreshold() {
+        return getAppConfig().getLong(Option.DEAD_NODE_SWEEP_THRESHOLD);
+    }
+
+    public void setDeadNodeSweepThreshold(long deadNodeSweepThreshold) {
+        configManager.set(Option.DEAD_NODE_SWEEP_THRESHOLD, deadNodeSweepThreshold);
+    }
+
     public int getProfileDumpPeriod() {
         return getAppConfig().getInt(Option.PROFILE_DUMP_PERIOD);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
index 0ea6399..08b8c11 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
@@ -63,7 +63,6 @@
     }
 
     public void notifyAck(HyracksDataException exception) {
-        // TODO: we should also reregister in case of no ack
         LOGGER.debug("ack rec'd from {} w/ exception: {}", ccId::toString, () -> String.valueOf(exception));
         if (exception != null && exception.matches(ErrorCode.HYRACKS, ErrorCode.NO_SUCH_NODE)) {
             LOGGER.info("{} indicates it does not recognize us; force a reconnect", ccId);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index d13ca8d..b5cb21a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -36,6 +36,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -416,4 +417,13 @@
             Thread.currentThread().interrupt();
         }
     }
+
+    @Override
+    public long getTotalDiskUsage() {
+        long totalSize = 0;
+        for (IODeviceHandle handle : ioDevices) {
+            totalSize += FileUtils.sizeOfDirectory(handle.getMount());
+        }
+        return totalSize;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 998acfb..f7ef2aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.net.protocols.muxdemux;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.channels.SocketChannel;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -163,4 +164,8 @@
                 + " remoteClose: " + remoteClose + " remoteCloseAck:" + remoteCloseAck + " readCredits: "
                 + ri.getCredits() + " writeCredits: " + wi.getCredits() + "]";
     }
+
+    public InetSocketAddress getRemoteAddress() {
+        return cSet.getMultiplexedConnection().getRemoteAddress();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
index 31a37ef..f5cdf2c 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -239,4 +239,8 @@
             ccbArray = Arrays.copyOf(ccbArray, ccbArray.length * 2);
         }
     }
+
+    public MultiplexedConnection getMultiplexedConnection() {
+        return mConn;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
index 049cfd8..32bf77e 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
@@ -21,8 +21,8 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayDeque;
-import java.util.Deque;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.hyracks.api.comm.IBufferFactory;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
@@ -33,22 +33,20 @@
 public class FullFrameChannelReadInterface extends AbstractChannelReadInterface {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private final Deque<ByteBuffer> riEmptyStack;
+    private final BlockingDeque<ByteBuffer> riEmptyStack;
     private final IChannelControlBlock ccb;
 
     FullFrameChannelReadInterface(IChannelControlBlock ccb) {
         this.ccb = ccb;
-        riEmptyStack = new ArrayDeque<>();
+        riEmptyStack = new LinkedBlockingDeque<>();
         credits = 0;
 
         emptyBufferAcceptor = buffer -> {
-            int delta = buffer.remaining();
-            synchronized (ccb) {
-                if (ccb.isRemotelyClosed()) {
-                    return;
-                }
-                riEmptyStack.push(buffer);
+            if (ccb.isRemotelyClosed()) {
+                return;
             }
+            riEmptyStack.push(buffer);
+            final int delta = buffer.remaining();
             ccb.addPendingCredits(delta);
         };
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index b6a392e..4c3836a 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.net.protocols.muxdemux;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
@@ -437,4 +438,8 @@
             }
         }
     }
+
+    public InetSocketAddress getRemoteAddress() {
+        return tcpConnection == null ? null : tcpConnection.getRemoteAddress();
+    }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index 7fbde73..2240fd9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -34,6 +34,7 @@
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
 
@@ -53,21 +54,14 @@
 
     @Override
     public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
-        String ts = getCurrentTimestamp();
-        String baseName = ts + DELIMITER + ts;
-        // Begin timestamp and end timestamp are identical since it is a flush
+        String baseName = getNextComponentSequence(btreeFilter);
         return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null,
                 hasBloomFilter ? baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
     }
 
     @Override
-    public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
-            throws HyracksDataException {
-        String[] firstTimestampRange = firstFileName.split(DELIMITER);
-        String[] lastTimestampRange = lastFileName.split(DELIMITER);
-
-        String baseName = firstTimestampRange[0] + DELIMITER + lastTimestampRange[1];
-        // Get the range of timestamps by taking the earliest and the latest timestamps
+    public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
+        final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
         return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null,
                 hasBloomFilter ? baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
     }
@@ -75,17 +69,17 @@
     @Override
     public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException {
         List<LSMComponentFileReferences> validFiles = new ArrayList<>();
-        ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<>();
-        ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<>();
+        ArrayList<IndexComponentFileReference> allBTreeFiles = new ArrayList<>();
+        ArrayList<IndexComponentFileReference> allBloomFilterFiles = new ArrayList<>();
         // create transaction filter <to hide transaction files>
         FilenameFilter transactionFilter = getTransactionFileFilter(false);
         // List of valid BTree files.
         cleanupAndGetValidFilesInternal(getCompoundFilter(transactionFilter, btreeFilter), btreeFactory, allBTreeFiles,
                 btreeFactory.getBufferCache());
         HashSet<String> btreeFilesSet = new HashSet<>();
-        for (ComparableFileName cmpFileName : allBTreeFiles) {
-            int index = cmpFileName.fileName.lastIndexOf(DELIMITER);
-            btreeFilesSet.add(cmpFileName.fileName.substring(0, index));
+        for (IndexComponentFileReference cmpFileName : allBTreeFiles) {
+            int index = cmpFileName.getFileName().lastIndexOf(DELIMITER);
+            btreeFilesSet.add(cmpFileName.getFileName().substring(0, index));
         }
 
         if (hasBloomFilter) {
@@ -104,53 +98,51 @@
 
         // Special case: sorting is not required
         if (allBTreeFiles.size() == 1 && (!hasBloomFilter || allBloomFilterFiles.size() == 1)) {
-            validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).fileRef, null,
-                    hasBloomFilter ? allBloomFilterFiles.get(0).fileRef : null));
+            validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).getFileRef(), null,
+                    hasBloomFilter ? allBloomFilterFiles.get(0).getFileRef() : null));
             return validFiles;
         }
 
-        // Sorts files names from earliest to latest timestamp.
+        // Sorts files names from earliest to latest sequence.
         Collections.sort(allBTreeFiles);
         if (hasBloomFilter) {
             Collections.sort(allBloomFilterFiles);
         }
 
-        List<ComparableFileName> validComparableBTreeFiles = new ArrayList<>();
-        ComparableFileName lastBTree = allBTreeFiles.get(0);
+        List<IndexComponentFileReference> validComparableBTreeFiles = new ArrayList<>();
+        IndexComponentFileReference lastBTree = allBTreeFiles.get(0);
         validComparableBTreeFiles.add(lastBTree);
 
-        List<ComparableFileName> validComparableBloomFilterFiles = null;
-        ComparableFileName lastBloomFilter = null;
+        List<IndexComponentFileReference> validComparableBloomFilterFiles = null;
+        IndexComponentFileReference lastBloomFilter = null;
         if (hasBloomFilter) {
             validComparableBloomFilterFiles = new ArrayList<>();
             lastBloomFilter = allBloomFilterFiles.get(0);
             validComparableBloomFilterFiles.add(lastBloomFilter);
         }
 
-        ComparableFileName currentBTree;
-        ComparableFileName currentBloomFilter = null;
+        IndexComponentFileReference currentBTree;
+        IndexComponentFileReference currentBloomFilter = null;
         for (int i = 1; i < allBTreeFiles.size(); i++) {
             currentBTree = allBTreeFiles.get(i);
             if (hasBloomFilter) {
                 currentBloomFilter = allBloomFilterFiles.get(i);
             }
-            // Current start timestamp is greater than last stop timestamp.
-            if (currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0
-                    && (!hasBloomFilter || currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0)) {
+            // Current start sequence is greater than last stop sequence.
+            if (currentBTree.isMoreRecentThan(lastBTree)
+                    && (!hasBloomFilter || currentBloomFilter.isMoreRecentThan(lastBloomFilter))) {
                 validComparableBTreeFiles.add(currentBTree);
                 lastBTree = currentBTree;
                 if (hasBloomFilter) {
                     validComparableBloomFilterFiles.add(currentBloomFilter);
                     lastBloomFilter = currentBloomFilter;
                 }
-            } else if (currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0
-                    && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0
-                    && (!hasBloomFilter || (currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0
-                            && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0))) {
+            } else if (currentBTree.isWithin(lastBTree)
+                    && (!hasBloomFilter || currentBloomFilter.isWithin(lastBloomFilter))) {
                 // Invalid files are completely contained in last interval.
-                delete(btreeFactory.getBufferCache(), currentBTree.fullPath);
+                delete(btreeFactory.getBufferCache(), currentBTree.getFullPath());
                 if (hasBloomFilter) {
-                    delete(btreeFactory.getBufferCache(), currentBloomFilter.fullPath);
+                    delete(btreeFactory.getBufferCache(), currentBloomFilter.getFullPath());
                 }
             } else {
                 // This scenario should not be possible.
@@ -161,21 +153,21 @@
         // Sort valid files in reverse lexicographical order, such that newer
         // files come first.
         Collections.sort(validComparableBTreeFiles, recencyCmp);
-        Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator();
-        Iterator<ComparableFileName> bloomFilterFileIter = null;
+        Iterator<IndexComponentFileReference> btreeFileIter = validComparableBTreeFiles.iterator();
+        Iterator<IndexComponentFileReference> bloomFilterFileIter = null;
         if (hasBloomFilter) {
             Collections.sort(validComparableBloomFilterFiles, recencyCmp);
             bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
         }
-        ComparableFileName cmpBTreeFileName = null;
-        ComparableFileName cmpBloomFilterFileName = null;
+        IndexComponentFileReference cmpBTreeFileName = null;
+        IndexComponentFileReference cmpBloomFilterFileName = null;
         while (btreeFileIter.hasNext() && (hasBloomFilter ? bloomFilterFileIter.hasNext() : true)) {
             cmpBTreeFileName = btreeFileIter.next();
             if (hasBloomFilter) {
                 cmpBloomFilterFileName = bloomFilterFileIter.next();
             }
-            validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.fileRef, null,
-                    hasBloomFilter ? cmpBloomFilterFileName.fileRef : null));
+            validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.getFileRef(), null,
+                    hasBloomFilter ? cmpBloomFilterFileName.getFileRef() : null));
         }
 
         return validFiles;
@@ -183,11 +175,10 @@
 
     @Override
     public LSMComponentFileReferences getNewTransactionFileReference() throws IOException {
-        String ts = getCurrentTimestamp();
+        String sequence = getNextComponentSequence(btreeFilter);
         // Create transaction lock file
-        IoUtil.create(baseDir.getChild(TXN_PREFIX + ts));
-        String baseName = ts + DELIMITER + ts;
-        // Begin timestamp and end timestamp are identical since it is a transaction
+        IoUtil.create(baseDir.getChild(TXN_PREFIX + sequence));
+        String baseName = getNextComponentSequence(btreeFilter);
         return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null,
                 baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
index 23c2367..8fb3751 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
@@ -35,6 +35,7 @@
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
 
@@ -58,23 +59,15 @@
 
     @Override
     public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
-        String ts = getCurrentTimestamp();
-        String baseName = ts + DELIMITER + ts;
-        // Begin timestamp and end timestamp are identical since it is a flush
+        String baseName = getNextComponentSequence(btreeFilter);
         return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + DELETE_TREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
     }
 
     @Override
-    public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
-            throws HyracksDataException {
-        String[] firstTimestampRange = firstFileName.split(DELIMITER);
-        String[] lastTimestampRange = lastFileName.split(DELIMITER);
-
-        String baseName = firstTimestampRange[0] + DELIMITER + lastTimestampRange[1];
-        // Get the range of timestamps by taking the earliest and the latest
-        // timestamps
+    public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
+        final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
         return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + DELETE_TREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
@@ -83,18 +76,17 @@
     @Override
     public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException {
         List<LSMComponentFileReferences> validFiles = new ArrayList<>();
-        ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<>();
-        ArrayList<ComparableFileName> allBuddyBTreeFiles = new ArrayList<>();
-        ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<>();
+        ArrayList<IndexComponentFileReference> allBTreeFiles = new ArrayList<>();
+        ArrayList<IndexComponentFileReference> allBuddyBTreeFiles = new ArrayList<>();
+        ArrayList<IndexComponentFileReference> allBloomFilterFiles = new ArrayList<>();
         // Create transaction file filter
         FilenameFilter transactionFilefilter = getTransactionFileFilter(false);
         // Gather files.
         cleanupAndGetValidFilesInternal(getCompoundFilter(btreeFilter, transactionFilefilter), btreeFactory,
                 allBTreeFiles, btreeFactory.getBufferCache());
         HashSet<String> btreeFilesSet = new HashSet<>();
-        for (ComparableFileName cmpFileName : allBTreeFiles) {
-            int index = cmpFileName.fileName.lastIndexOf(DELIMITER);
-            btreeFilesSet.add(cmpFileName.fileName.substring(0, index));
+        for (IndexComponentFileReference cmpFileName : allBTreeFiles) {
+            btreeFilesSet.add(cmpFileName.getSequence());
         }
         validateFiles(btreeFilesSet, allBuddyBTreeFiles, getCompoundFilter(buddyBtreeFilter, transactionFilefilter),
                 buddyBtreeFactory, btreeFactory.getBufferCache());
@@ -109,52 +101,47 @@
             return validFiles;
         }
         if (allBTreeFiles.size() == 1 && allBuddyBTreeFiles.size() == 1 && allBloomFilterFiles.size() == 1) {
-            validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).fileRef,
-                    allBuddyBTreeFiles.get(0).fileRef, allBloomFilterFiles.get(0).fileRef));
+            validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).getFileRef(),
+                    allBuddyBTreeFiles.get(0).getFileRef(), allBloomFilterFiles.get(0).getFileRef()));
             return validFiles;
         }
 
-        // Sorts files names from earliest to latest timestamp.
+        // Sorts files names from earliest to latest sequence.
         Collections.sort(allBTreeFiles);
         Collections.sort(allBuddyBTreeFiles);
         Collections.sort(allBloomFilterFiles);
 
-        List<ComparableFileName> validComparableBTreeFiles = new ArrayList<>();
-        ComparableFileName lastBTree = allBTreeFiles.get(0);
+        List<IndexComponentFileReference> validComparableBTreeFiles = new ArrayList<>();
+        IndexComponentFileReference lastBTree = allBTreeFiles.get(0);
         validComparableBTreeFiles.add(lastBTree);
 
-        List<ComparableFileName> validComparableBuddyBTreeFiles = new ArrayList<>();
-        ComparableFileName lastBuddyBTree = allBuddyBTreeFiles.get(0);
+        List<IndexComponentFileReference> validComparableBuddyBTreeFiles = new ArrayList<>();
+        IndexComponentFileReference lastBuddyBTree = allBuddyBTreeFiles.get(0);
         validComparableBuddyBTreeFiles.add(lastBuddyBTree);
 
-        List<ComparableFileName> validComparableBloomFilterFiles = new ArrayList<>();
-        ComparableFileName lastBloomFilter = allBloomFilterFiles.get(0);
+        List<IndexComponentFileReference> validComparableBloomFilterFiles = new ArrayList<>();
+        IndexComponentFileReference lastBloomFilter = allBloomFilterFiles.get(0);
         validComparableBloomFilterFiles.add(lastBloomFilter);
 
         for (int i = 1; i < allBTreeFiles.size(); i++) {
-            ComparableFileName currentBTree = allBTreeFiles.get(i);
-            ComparableFileName currentBuddyBTree = allBuddyBTreeFiles.get(i);
-            ComparableFileName currentBloomFilter = allBloomFilterFiles.get(i);
-            // Current start timestamp is greater than last stop timestamp.
-            if (currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0
-                    && currentBuddyBTree.interval[0].compareTo(lastBuddyBTree.interval[1]) > 0
-                    && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0) {
+            IndexComponentFileReference currentBTree = allBTreeFiles.get(i);
+            IndexComponentFileReference currentBuddyBTree = allBuddyBTreeFiles.get(i);
+            IndexComponentFileReference currentBloomFilter = allBloomFilterFiles.get(i);
+            // Current start sequence is greater than last stop sequence
+            if (currentBTree.isMoreRecentThan(lastBTree) && currentBuddyBTree.isMoreRecentThan(lastBuddyBTree)
+                    && currentBloomFilter.isMoreRecentThan(lastBloomFilter)) {
                 validComparableBTreeFiles.add(currentBTree);
                 validComparableBuddyBTreeFiles.add(currentBuddyBTree);
                 validComparableBloomFilterFiles.add(currentBloomFilter);
                 lastBTree = currentBTree;
                 lastBuddyBTree = currentBuddyBTree;
                 lastBloomFilter = currentBloomFilter;
-            } else if (currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0
-                    && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0
-                    && currentBuddyBTree.interval[0].compareTo(lastBuddyBTree.interval[0]) >= 0
-                    && currentBuddyBTree.interval[1].compareTo(lastBuddyBTree.interval[1]) <= 0
-                    && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0
-                    && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0) {
-                // Invalid files are completely contained in last interval.
-                delete(treeFactory.getBufferCache(), currentBTree.fullPath);
-                delete(treeFactory.getBufferCache(), currentBuddyBTree.fullPath);
-                delete(treeFactory.getBufferCache(), currentBloomFilter.fullPath);
+            } else if (currentBTree.isWithin(lastBTree) && currentBuddyBTree.isWithin(lastBuddyBTree)
+                    && currentBloomFilter.isWithin(lastBloomFilter)) {
+                // Invalid files are completely contained in last sequence.
+                delete(treeFactory.getBufferCache(), currentBTree.getFullPath());
+                delete(treeFactory.getBufferCache(), currentBuddyBTree.getFullPath());
+                delete(treeFactory.getBufferCache(), currentBloomFilter.getFullPath());
             } else {
                 // This scenario should not be possible.
                 throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir);
@@ -163,19 +150,19 @@
 
         // Sort valid files in reverse lexicographical order, such that newer
         // files come first.
-        Collections.sort(validComparableBTreeFiles, recencyCmp);
-        Collections.sort(validComparableBuddyBTreeFiles, recencyCmp);
-        Collections.sort(validComparableBloomFilterFiles, recencyCmp);
+        validComparableBTreeFiles.sort(recencyCmp);
+        validComparableBuddyBTreeFiles.sort(recencyCmp);
+        validComparableBloomFilterFiles.sort(recencyCmp);
 
-        Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator();
-        Iterator<ComparableFileName> buddyBtreeFileIter = validComparableBuddyBTreeFiles.iterator();
-        Iterator<ComparableFileName> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
+        Iterator<IndexComponentFileReference> btreeFileIter = validComparableBTreeFiles.iterator();
+        Iterator<IndexComponentFileReference> buddyBtreeFileIter = validComparableBuddyBTreeFiles.iterator();
+        Iterator<IndexComponentFileReference> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
         while (btreeFileIter.hasNext() && buddyBtreeFileIter.hasNext()) {
-            ComparableFileName cmpBTreeFileName = btreeFileIter.next();
-            ComparableFileName cmpBuddyBTreeFileName = buddyBtreeFileIter.next();
-            ComparableFileName cmpBloomFilterFileName = bloomFilterFileIter.next();
-            validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.fileRef, cmpBuddyBTreeFileName.fileRef,
-                    cmpBloomFilterFileName.fileRef));
+            IndexComponentFileReference cmpBTreeFileName = btreeFileIter.next();
+            IndexComponentFileReference cmpBuddyBTreeFileName = buddyBtreeFileIter.next();
+            IndexComponentFileReference cmpBloomFilterFileName = bloomFilterFileIter.next();
+            validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.getFileRef(),
+                    cmpBuddyBTreeFileName.getFileRef(), cmpBloomFilterFileName.getFileRef()));
         }
 
         return validFiles;
@@ -183,10 +170,9 @@
 
     @Override
     public LSMComponentFileReferences getNewTransactionFileReference() throws IOException {
-        String ts = getCurrentTimestamp();
         // Create transaction lock file
-        Files.createFile(Paths.get(baseDir + TXN_PREFIX + ts));
-        String baseName = ts + DELIMITER + ts;
+        String baseName = getNextComponentSequence(btreeFilter);
+        Files.createFile(Paths.get(baseDir + TXN_PREFIX + baseName));
         return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + DELETE_TREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
index e6aa2d1..a5c6360 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
@@ -38,5 +38,4 @@
      * @return the index of the current memory component
      */
     int getCurrentComponentIndex();
-
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 57fd01d..1f481c9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -21,12 +21,9 @@
 
 import java.io.FilenameFilter;
 import java.io.IOException;
-import java.text.Format;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 
@@ -42,7 +39,9 @@
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
 
+@NotThreadSafe
 public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManager {
 
     public enum TreeIndexState {
@@ -76,22 +75,18 @@
      */
     public static final String TXN_PREFIX = ".T";
 
-    public static final String COMPONENT_TIMESTAMP_FORMAT = "yyyy-MM-dd-HH-mm-ss-SSS";
-
     public static final FilenameFilter COMPONENT_FILES_FILTER = (dir, name) -> !name.startsWith(".");
     protected static final FilenameFilter txnFileNameFilter = (dir, name) -> name.startsWith(TXN_PREFIX);
     protected static FilenameFilter bloomFilterFilter =
             (dir, name) -> !name.startsWith(".") && name.endsWith(BLOOM_FILTER_SUFFIX);
-    protected static final FilenameFilter dummyFilter = (dir, name) -> true;
     protected static final Comparator<String> cmp = new FileNameComparator();
+    private static final FilenameFilter dummyFilter = (dir, name) -> true;
 
     protected final IIOManager ioManager;
     // baseDir should reflect dataset name and partition name and be absolute
     protected final FileReference baseDir;
-    protected final Format formatter = new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT);
-    protected final Comparator<ComparableFileName> recencyCmp = new RecencyComparator();
+    protected final Comparator<IndexComponentFileReference> recencyCmp = new RecencyComparator();
     protected final TreeIndexFactory<? extends ITreeIndex> treeFactory;
-    private String prevTimestamp = null;
 
     public AbstractLSMIndexFileManager(IIOManager ioManager, FileReference file,
             TreeIndexFactory<? extends ITreeIndex> treeFactory) {
@@ -131,18 +126,18 @@
     }
 
     protected void cleanupAndGetValidFilesInternal(FilenameFilter filter,
-            TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles,
+            TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<IndexComponentFileReference> allFiles,
             IBufferCache bufferCache) throws HyracksDataException {
         String[] files = listDirFiles(baseDir, filter);
         for (String fileName : files) {
             FileReference fileRef = baseDir.getChild(fileName);
             if (treeFactory == null) {
-                allFiles.add(new ComparableFileName(fileRef));
+                allFiles.add(IndexComponentFileReference.of(fileRef));
                 continue;
             }
             TreeIndexState idxState = isValidTreeIndex(treeFactory.createIndexInstance(fileRef));
             if (idxState == TreeIndexState.VALID) {
-                allFiles.add(new ComparableFileName(fileRef));
+                allFiles.add(IndexComponentFileReference.of(fileRef));
             } else if (idxState == TreeIndexState.INVALID) {
                 bufferCache.deleteFile(fileRef);
             }
@@ -167,18 +162,16 @@
         return files;
     }
 
-    protected void validateFiles(HashSet<String> groundTruth, ArrayList<ComparableFileName> validFiles,
+    protected void validateFiles(HashSet<String> groundTruth, ArrayList<IndexComponentFileReference> validFiles,
             FilenameFilter filter, TreeIndexFactory<? extends ITreeIndex> treeFactory, IBufferCache bufferCache)
             throws HyracksDataException {
-        ArrayList<ComparableFileName> tmpAllInvListsFiles = new ArrayList<>();
+        ArrayList<IndexComponentFileReference> tmpAllInvListsFiles = new ArrayList<>();
         cleanupAndGetValidFilesInternal(filter, treeFactory, tmpAllInvListsFiles, bufferCache);
-        for (ComparableFileName cmpFileName : tmpAllInvListsFiles) {
-            int index = cmpFileName.fileName.lastIndexOf(DELIMITER);
-            String file = cmpFileName.fileName.substring(0, index);
-            if (groundTruth.contains(file)) {
+        for (IndexComponentFileReference cmpFileName : tmpAllInvListsFiles) {
+            if (groundTruth.contains(cmpFileName.getSequence())) {
                 validFiles.add(cmpFileName);
             } else {
-                delete(bufferCache, cmpFileName.fullPath);
+                delete(bufferCache, cmpFileName.getFullPath());
             }
         }
     }
@@ -198,30 +191,20 @@
 
     @Override
     public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
-        String ts = getCurrentTimestamp();
-        // Begin timestamp and end timestamp are identical since it is a flush
-        return new LSMComponentFileReferences(baseDir.getChild(ts + DELIMITER + ts), null, null);
+        final String sequence = getNextComponentSequence(COMPONENT_FILES_FILTER);
+        return new LSMComponentFileReferences(baseDir.getChild(sequence), null, null);
     }
 
     @Override
-    public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
-            throws HyracksDataException {
-        String[] firstTimestampRange = firstFileName.split(DELIMITER);
-        String[] lastTimestampRange = lastFileName.split(DELIMITER);
-        String start = firstTimestampRange[0];
-        String end = lastTimestampRange[1];
-        if (end.compareTo(start) <= 0) {
-            throw new IllegalArgumentException(
-                    "A Merge file must have end greater than start. Found end: " + end + " and start: " + start);
-        }
-        // Get the range of timestamps by taking the earliest and the latest timestamps
-        return new LSMComponentFileReferences(baseDir.getChild(start + DELIMITER + end), null, null);
+    public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
+        final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
+        return new LSMComponentFileReferences(baseDir.getChild(baseName), null, null);
     }
 
     @Override
     public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException {
         List<LSMComponentFileReferences> validFiles = new ArrayList<>();
-        ArrayList<ComparableFileName> allFiles = new ArrayList<>();
+        ArrayList<IndexComponentFileReference> allFiles = new ArrayList<>();
 
         // Gather files and delete invalid files
         // There are two types of invalid files:
@@ -235,40 +218,37 @@
         }
 
         if (allFiles.size() == 1) {
-            validFiles.add(new LSMComponentFileReferences(allFiles.get(0).fileRef, null, null));
+            validFiles.add(new LSMComponentFileReferences(allFiles.get(0).getFileRef(), null, null));
             return validFiles;
         }
 
-        // Sorts files names from earliest to latest timestamp.
+        // Sorts files names from earliest to latest
         Collections.sort(allFiles);
 
-        List<ComparableFileName> validComparableFiles = new ArrayList<>();
-        ComparableFileName last = allFiles.get(0);
+        List<IndexComponentFileReference> validComparableFiles = new ArrayList<>();
+        IndexComponentFileReference last = allFiles.get(0);
         validComparableFiles.add(last);
         for (int i = 1; i < allFiles.size(); i++) {
-            ComparableFileName current = allFiles.get(i);
-            // The current start timestamp is greater than last stop timestamp so current is valid.
-            if (current.interval[0].compareTo(last.interval[1]) > 0) {
+            IndexComponentFileReference current = allFiles.get(i);
+            if (current.isMoreRecentThan(last)) {
+                // The current start sequence is greater than last stop sequence so current is valid.
                 validComparableFiles.add(current);
                 last = current;
-            } else if (current.interval[0].compareTo(last.interval[0]) >= 0
-                    && current.interval[1].compareTo(last.interval[1]) <= 0) {
+            } else if (current.isWithin(last)) {
                 // The current file is completely contained in the interval of the
                 // last file. Thus the last file must contain at least as much information
                 // as the current file, so delete the current file.
-                delete(treeFactory.getBufferCache(), current.fullPath);
+                delete(treeFactory.getBufferCache(), current.getFullPath());
             } else {
                 // This scenario should not be possible since timestamps are monotonically increasing.
                 throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir);
             }
         }
-
         // Sort valid files in reverse lexicographical order, such that newer files come first.
-        Collections.sort(validComparableFiles, recencyCmp);
-        for (ComparableFileName cmpFileName : validComparableFiles) {
-            validFiles.add(new LSMComponentFileReferences(cmpFileName.fileRef, null, null));
+        validComparableFiles.sort(recencyCmp);
+        for (IndexComponentFileReference cmpFileName : validComparableFiles) {
+            validFiles.add(new LSMComponentFileReferences(cmpFileName.getFileRef(), null, null));
         }
-
         return validFiles;
     }
 
@@ -287,8 +267,7 @@
     private static class FileNameComparator implements Comparator<String> {
         @Override
         public int compare(String a, String b) {
-            // Consciously ignoring locale.
-            return -a.compareTo(b);
+            return IndexComponentFileReference.of(b).compareTo(IndexComponentFileReference.of(a));
         }
     }
 
@@ -309,45 +288,14 @@
         }
     }
 
-    protected class ComparableFileName implements Comparable<ComparableFileName> {
-        public final FileReference fileRef;
-        public final String fullPath;
-        public final String fileName;
-
-        // Timestamp interval.
-        public final String[] interval;
-
-        public ComparableFileName(FileReference fileRef) {
-            this.fileRef = fileRef;
-            this.fullPath = fileRef.getFile().getAbsolutePath();
-            this.fileName = fileRef.getFile().getName();
-            interval = fileName.split(DELIMITER);
-        }
-
+    private class RecencyComparator implements Comparator<IndexComponentFileReference> {
         @Override
-        public int compareTo(ComparableFileName b) {
-            int startCmp = interval[0].compareTo(b.interval[0]);
+        public int compare(IndexComponentFileReference a, IndexComponentFileReference b) {
+            int startCmp = -Long.compare(a.getSequenceStart(), b.getSequenceStart());
             if (startCmp != 0) {
                 return startCmp;
             }
-            return b.interval[1].compareTo(interval[1]);
-        }
-
-        @Override
-        public String toString() {
-            return "{\"type\" : \"" + (interval[0].equals(interval[1]) ? "flush" : "merge") + "\", \"start\" : \""
-                    + interval[0] + "\", \"end\" : \"" + interval[1] + "\"}";
-        }
-    }
-
-    private class RecencyComparator implements Comparator<ComparableFileName> {
-        @Override
-        public int compare(ComparableFileName a, ComparableFileName b) {
-            int cmp = -a.interval[0].compareTo(b.interval[0]);
-            if (cmp != 0) {
-                return cmp;
-            }
-            return -a.interval[1].compareTo(b.interval[1]);
+            return -Long.compare(a.getSequenceEnd(), b.getSequenceEnd());
         }
     }
 
@@ -382,10 +330,10 @@
         return null;
     }
 
-    protected static FilenameFilter createTransactionFilter(String transactionFileName, final boolean inclusive) {
+    private static FilenameFilter createTransactionFilter(String transactionFileName, final boolean inclusive) {
         final String timeStamp =
                 transactionFileName.substring(transactionFileName.indexOf(TXN_PREFIX) + TXN_PREFIX.length());
-        return (dir, name) -> inclusive ? name.startsWith(timeStamp) : !name.startsWith(timeStamp);
+        return (dir, name) -> inclusive == name.startsWith(timeStamp);
     }
 
     protected FilenameFilter getTransactionFileFilter(boolean inclusive) throws HyracksDataException {
@@ -406,34 +354,12 @@
         return (dir, name) -> filter1.accept(dir, name) && filter2.accept(dir, name);
     }
 
-    /**
-     * @return The string format of the current timestamp.
-     *         The returned results of this method are guaranteed to not have duplicates.
-     */
-    protected String getCurrentTimestamp() {
-        Date date = new Date();
-        String ts = formatter.format(date);
-        /**
-         * prevent a corner case where the same timestamp can be given.
-         */
-        while (prevTimestamp != null && ts.compareTo(prevTimestamp) == 0) {
-            try {
-                Thread.sleep(1);
-                date = new Date();
-                ts = formatter.format(date);
-            } catch (InterruptedException e) {
-                //ignore
-            }
+    protected String getNextComponentSequence(FilenameFilter filenameFilter) throws HyracksDataException {
+        long maxComponentSeq = -1;
+        final String[] files = listDirFiles(baseDir, filenameFilter);
+        for (String fileName : files) {
+            maxComponentSeq = Math.max(maxComponentSeq, IndexComponentFileReference.of(fileName).getSequenceEnd());
         }
-        prevTimestamp = ts;
-        return ts;
-    }
-
-    public static String getComponentStartTime(String fileName) {
-        return fileName.split(DELIMITER)[0];
-    }
-
-    public static String getComponentEndTime(String fileName) {
-        return fileName.split(DELIMITER)[1];
+        return IndexComponentFileReference.getFlushSequence(maxComponentSeq + 1);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexComponentFileReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexComponentFileReference.java
new file mode 100644
index 0000000..bbadf60
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexComponentFileReference.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.DELIMITER;
+
+import java.util.Objects;
+
+import org.apache.hyracks.api.io.FileReference;
+
+public class IndexComponentFileReference implements Comparable<IndexComponentFileReference> {
+
+    private FileReference fileRef;
+    private String fullPath;
+    private String fileName;
+    private long sequenceStart;
+    private long sequenceEnd;
+
+    private IndexComponentFileReference() {
+    }
+
+    public static IndexComponentFileReference of(String file) {
+        final IndexComponentFileReference ref = new IndexComponentFileReference();
+        ref.fileName = file;
+        final String[] splits = file.split(DELIMITER);
+        ref.sequenceStart = Long.parseLong(splits[0]);
+        ref.sequenceEnd = Long.parseLong(splits[1]);
+        return ref;
+    }
+
+    public static IndexComponentFileReference of(FileReference fileRef) {
+        final IndexComponentFileReference ref = of(fileRef.getFile().getName());
+        ref.fileRef = fileRef;
+        ref.fullPath = fileRef.getFile().getAbsolutePath();
+        return ref;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        IndexComponentFileReference that = (IndexComponentFileReference) o;
+        return Objects.equals(fileName, that.fileName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(fileName);
+    }
+
+    @Override
+    public int compareTo(IndexComponentFileReference o) {
+        int startCmp = Long.compare(sequenceStart, o.sequenceStart);
+        if (startCmp != 0) {
+            return startCmp;
+        }
+        return Long.compare(o.sequenceEnd, sequenceEnd);
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    public long getSequenceStart() {
+        return sequenceStart;
+    }
+
+    public long getSequenceEnd() {
+        return sequenceEnd;
+    }
+
+    public String getFullPath() {
+        return fullPath;
+    }
+
+    public FileReference getFileRef() {
+        return fileRef;
+    }
+
+    public String getSequence() {
+        return sequenceStart + DELIMITER + sequenceEnd;
+    }
+
+    public boolean isMoreRecentThan(IndexComponentFileReference other) {
+        return sequenceStart > other.getSequenceEnd();
+    }
+
+    public boolean isWithin(IndexComponentFileReference other) {
+        return sequenceStart >= other.getSequenceStart() && sequenceEnd <= other.getSequenceEnd();
+    }
+
+    @Override
+    public String toString() {
+        return "{\"type\" : \"" + (isFlush() ? "flush" : "merge") + "\", \"start\" : \"" + sequenceStart
+                + "\", \"end\" : \"" + sequenceEnd + "\"}";
+    }
+
+    private boolean isFlush() {
+        return sequenceStart == sequenceEnd;
+    }
+
+    public static String getFlushSequence(long componentSequence) {
+        return componentSequence + DELIMITER + componentSequence;
+    }
+
+    public static String getMergeSequence(String firstComponentName, String lastComponentName) {
+        long mergeSequenceStart = IndexComponentFileReference.of(firstComponentName).getSequenceStart();
+        long mergeSequenceEnd = IndexComponentFileReference.of(lastComponentName).getSequenceEnd();
+        return mergeSequenceStart + DELIMITER + mergeSequenceEnd;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
index c7990a6..cf6c4a2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
@@ -24,12 +24,14 @@
 public class LSMComponentId implements ILSMComponentId {
 
     public static final long NOT_FOUND = -1;
+    public static final long MIN_VALID_COMPONENT_ID = 0;
 
     // Used to represent an empty index with no components
     public static final LSMComponentId EMPTY_INDEX_LAST_COMPONENT_ID = new LSMComponentId(NOT_FOUND, NOT_FOUND);
 
     // A default component id used for bulk loaded component
-    public static final LSMComponentId DEFAULT_COMPONENT_ID = new LSMComponentId(0, 0);
+    public static final LSMComponentId DEFAULT_COMPONENT_ID =
+            new LSMComponentId(MIN_VALID_COMPONENT_ID, MIN_VALID_COMPONENT_ID);
 
     private long minId;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
index 3da57fd..21a27a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
@@ -21,55 +21,40 @@
 
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.util.annotations.ThreadSafe;
 
 /**
  * A default implementation of {@link ILSMComponentIdGenerator}.
- *
  */
+@ThreadSafe
 public class LSMComponentIdGenerator implements ILSMComponentIdGenerator {
 
     private final int numComponents;
     private int currentComponentIndex;
-    protected long previousTimestamp = -1L;
+    private long lastUsedId;
     private ILSMComponentId componentId;
 
-    public LSMComponentIdGenerator(int numComponents) {
+    public LSMComponentIdGenerator(int numComponents, long lastUsedId) {
         this.numComponents = numComponents;
+        this.lastUsedId = lastUsedId;
         refresh();
         currentComponentIndex = 0;
     }
 
     @Override
-    public void refresh() {
-        long ts = getCurrentTimestamp();
-        componentId = new LSMComponentId(ts, ts);
+    public synchronized void refresh() {
+        final long nextId = ++lastUsedId;
+        componentId = new LSMComponentId(nextId, nextId);
         currentComponentIndex = (currentComponentIndex + 1) % numComponents;
     }
 
     @Override
-    public ILSMComponentId getId() {
+    public synchronized ILSMComponentId getId() {
         return componentId;
     }
 
     @Override
-    public int getCurrentComponentIndex() {
+    public synchronized int getCurrentComponentIndex() {
         return currentComponentIndex;
     }
-
-    protected long getCurrentTimestamp() {
-        long timestamp = System.currentTimeMillis();
-        while (timestamp <= previousTimestamp) {
-            // make sure timestamp is strictly increasing
-            try {
-                Thread.sleep(1);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            }
-            timestamp = System.currentTimeMillis();
-        }
-        previousTimestamp = timestamp;
-        return timestamp;
-
-    }
-
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
index 2f1eb87..4471102 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.BTreeFactory;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexFileNameMapper;
 
@@ -57,21 +58,15 @@
 
     @Override
     public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
-        String ts = getCurrentTimestamp();
-        String baseName = ts + DELIMITER + ts;
-        // Begin timestamp and end timestamp are identical since it is a flush
+        String baseName = getNextComponentSequence(deletedKeysBTreeFilter);
         return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + DICT_BTREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + DELETED_KEYS_BTREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
     }
 
     @Override
-    public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
-            throws HyracksDataException {
-        String[] firstTimestampRange = firstFileName.split(DELIMITER);
-        String[] lastTimestampRange = lastFileName.split(DELIMITER);
-        String baseName = firstTimestampRange[0] + DELIMITER + lastTimestampRange[1];
-        // Get the range of timestamps by taking the earliest and the latest timestamps
+    public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
+        final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
         return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + DICT_BTREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + DELETED_KEYS_BTREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
@@ -80,18 +75,17 @@
     @Override
     public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException {
         List<LSMComponentFileReferences> validFiles = new ArrayList<>();
-        ArrayList<ComparableFileName> allDictBTreeFiles = new ArrayList<>();
-        ArrayList<ComparableFileName> allInvListsFiles = new ArrayList<>();
-        ArrayList<ComparableFileName> allDeletedKeysBTreeFiles = new ArrayList<>();
-        ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<>();
+        ArrayList<IndexComponentFileReference> allDictBTreeFiles = new ArrayList<>();
+        ArrayList<IndexComponentFileReference> allInvListsFiles = new ArrayList<>();
+        ArrayList<IndexComponentFileReference> allDeletedKeysBTreeFiles = new ArrayList<>();
+        ArrayList<IndexComponentFileReference> allBloomFilterFiles = new ArrayList<>();
 
         // Gather files.
         cleanupAndGetValidFilesInternal(deletedKeysBTreeFilter, btreeFactory, allDeletedKeysBTreeFiles,
                 btreeFactory.getBufferCache());
         HashSet<String> deletedKeysBTreeFilesSet = new HashSet<>();
-        for (ComparableFileName cmpFileName : allDeletedKeysBTreeFiles) {
-            int index = cmpFileName.fileName.lastIndexOf(DELIMITER);
-            deletedKeysBTreeFilesSet.add(cmpFileName.fileName.substring(0, index));
+        for (IndexComponentFileReference cmpFileName : allDeletedKeysBTreeFiles) {
+            deletedKeysBTreeFilesSet.add(cmpFileName.getSequence());
         }
 
         // TODO: do we really need to validate the inverted lists files or is validating the dict. BTrees is enough?
@@ -116,52 +110,48 @@
 
         if (allDictBTreeFiles.size() == 1 && allInvListsFiles.size() == 1 && allDeletedKeysBTreeFiles.size() == 1
                 && allBloomFilterFiles.size() == 1) {
-            validFiles.add(new LSMComponentFileReferences(allDictBTreeFiles.get(0).fileRef,
-                    allDeletedKeysBTreeFiles.get(0).fileRef, allBloomFilterFiles.get(0).fileRef));
+            validFiles.add(new LSMComponentFileReferences(allDictBTreeFiles.get(0).getFileRef(),
+                    allDeletedKeysBTreeFiles.get(0).getFileRef(), allBloomFilterFiles.get(0).getFileRef()));
             return validFiles;
         }
 
-        // Sorts files names from earliest to latest timestamp.
+        // Sorts files names from earliest to latest sequence.
         Collections.sort(allDeletedKeysBTreeFiles);
         Collections.sort(allDictBTreeFiles);
         Collections.sort(allBloomFilterFiles);
 
-        List<ComparableFileName> validComparableDictBTreeFiles = new ArrayList<>();
-        ComparableFileName lastDictBTree = allDictBTreeFiles.get(0);
+        List<IndexComponentFileReference> validComparableDictBTreeFiles = new ArrayList<>();
+        IndexComponentFileReference lastDictBTree = allDictBTreeFiles.get(0);
         validComparableDictBTreeFiles.add(lastDictBTree);
 
-        List<ComparableFileName> validComparableDeletedKeysBTreeFiles = new ArrayList<>();
-        ComparableFileName lastDeletedKeysBTree = allDeletedKeysBTreeFiles.get(0);
+        List<IndexComponentFileReference> validComparableDeletedKeysBTreeFiles = new ArrayList<>();
+        IndexComponentFileReference lastDeletedKeysBTree = allDeletedKeysBTreeFiles.get(0);
         validComparableDeletedKeysBTreeFiles.add(lastDeletedKeysBTree);
 
-        List<ComparableFileName> validComparableBloomFilterFiles = new ArrayList<>();
-        ComparableFileName lastBloomFilter = allBloomFilterFiles.get(0);
+        List<IndexComponentFileReference> validComparableBloomFilterFiles = new ArrayList<>();
+        IndexComponentFileReference lastBloomFilter = allBloomFilterFiles.get(0);
         validComparableBloomFilterFiles.add(lastBloomFilter);
 
         for (int i = 1; i < allDictBTreeFiles.size(); i++) {
-            ComparableFileName currentDeletedKeysBTree = allDeletedKeysBTreeFiles.get(i);
-            ComparableFileName currentDictBTree = allDictBTreeFiles.get(i);
-            ComparableFileName currentBloomFilter = allBloomFilterFiles.get(i);
-            // Current start timestamp is greater than last stop timestamp.
-            if (currentDeletedKeysBTree.interval[0].compareTo(lastDeletedKeysBTree.interval[1]) > 0
-                    && currentDictBTree.interval[0].compareTo(lastDictBTree.interval[1]) > 0
-                    && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0) {
+            IndexComponentFileReference currentDeletedKeysBTree = allDeletedKeysBTreeFiles.get(i);
+            IndexComponentFileReference currentDictBTree = allDictBTreeFiles.get(i);
+            IndexComponentFileReference currentBloomFilter = allBloomFilterFiles.get(i);
+            // Current start sequence is greater than last stop sequence.
+            if (currentDeletedKeysBTree.isMoreRecentThan(lastDeletedKeysBTree)
+                    && currentDictBTree.isMoreRecentThan(lastDictBTree)
+                    && currentBloomFilter.isMoreRecentThan(lastBloomFilter)) {
                 validComparableDictBTreeFiles.add(currentDictBTree);
                 validComparableDeletedKeysBTreeFiles.add(currentDeletedKeysBTree);
                 validComparableBloomFilterFiles.add(currentBloomFilter);
                 lastDictBTree = currentDictBTree;
                 lastDeletedKeysBTree = currentDeletedKeysBTree;
                 lastBloomFilter = currentBloomFilter;
-            } else if (currentDeletedKeysBTree.interval[0].compareTo(lastDeletedKeysBTree.interval[0]) >= 0
-                    && currentDeletedKeysBTree.interval[1].compareTo(lastDeletedKeysBTree.interval[1]) <= 0
-                    && currentDictBTree.interval[0].compareTo(lastDictBTree.interval[0]) >= 0
-                    && currentDictBTree.interval[1].compareTo(lastDictBTree.interval[1]) <= 0
-                    && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0
-                    && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0) {
-                // Invalid files are completely contained in last interval.
-                delete(treeFactory.getBufferCache(), currentDeletedKeysBTree.fullPath);
-                delete(treeFactory.getBufferCache(), currentDictBTree.fullPath);
-                delete(treeFactory.getBufferCache(), currentBloomFilter.fullPath);
+            } else if (currentDeletedKeysBTree.isWithin(lastDeletedKeysBTree)
+                    && currentDictBTree.isWithin(lastDictBTree) && currentBloomFilter.isWithin(lastBloomFilter)) {
+                // Invalid files are completely contained in last sequence.
+                delete(treeFactory.getBufferCache(), currentDeletedKeysBTree.getFullPath());
+                delete(treeFactory.getBufferCache(), currentDictBTree.getFullPath());
+                delete(treeFactory.getBufferCache(), currentBloomFilter.getFullPath());
             } else {
                 // This scenario should not be possible.
                 throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir);
@@ -170,21 +160,20 @@
 
         // Sort valid files in reverse lexicographical order, such that newer
         // files come first.
-        Collections.sort(validComparableDictBTreeFiles, recencyCmp);
-        Collections.sort(validComparableDeletedKeysBTreeFiles, recencyCmp);
-        Collections.sort(validComparableBloomFilterFiles, recencyCmp);
+        validComparableDictBTreeFiles.sort(recencyCmp);
+        validComparableDeletedKeysBTreeFiles.sort(recencyCmp);
+        validComparableBloomFilterFiles.sort(recencyCmp);
 
-        Iterator<ComparableFileName> dictBTreeFileIter = validComparableDictBTreeFiles.iterator();
-        Iterator<ComparableFileName> deletedKeysBTreeIter = validComparableDeletedKeysBTreeFiles.iterator();
-        Iterator<ComparableFileName> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
+        Iterator<IndexComponentFileReference> dictBTreeFileIter = validComparableDictBTreeFiles.iterator();
+        Iterator<IndexComponentFileReference> deletedKeysBTreeIter = validComparableDeletedKeysBTreeFiles.iterator();
+        Iterator<IndexComponentFileReference> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
         while (dictBTreeFileIter.hasNext() && deletedKeysBTreeIter.hasNext()) {
-            ComparableFileName cmpDictBTreeFile = dictBTreeFileIter.next();
-            ComparableFileName cmpDeletedKeysBTreeFile = deletedKeysBTreeIter.next();
-            ComparableFileName cmpBloomFilterFileName = bloomFilterFileIter.next();
-            validFiles.add(new LSMComponentFileReferences(cmpDictBTreeFile.fileRef, cmpDeletedKeysBTreeFile.fileRef,
-                    cmpBloomFilterFileName.fileRef));
+            IndexComponentFileReference cmpDictBTreeFile = dictBTreeFileIter.next();
+            IndexComponentFileReference cmpDeletedKeysBTreeFile = deletedKeysBTreeIter.next();
+            IndexComponentFileReference cmpBloomFilterFileName = bloomFilterFileIter.next();
+            validFiles.add(new LSMComponentFileReferences(cmpDictBTreeFile.getFileRef(),
+                    cmpDeletedKeysBTreeFile.getFileRef(), cmpBloomFilterFileName.getFileRef()));
         }
-
         return validFiles;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
index 2512776..3348407 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
@@ -36,6 +36,7 @@
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
 
@@ -58,22 +59,15 @@
 
     @Override
     public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
-        String ts = getCurrentTimestamp();
-        String baseName = ts + DELIMITER + ts;
-        // Begin timestamp and end timestamp are identical since it is a flush
+        String baseName = getNextComponentSequence(btreeFilter);
         return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + RTREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
     }
 
     @Override
-    public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
-            throws HyracksDataException {
-        String[] firstTimestampRange = firstFileName.split(DELIMITER);
-        String[] lastTimestampRange = lastFileName.split(DELIMITER);
-        String baseName = firstTimestampRange[0] + DELIMITER + lastTimestampRange[1];
-        // Get the range of timestamps by taking the earliest and the latest
-        // timestamps
+    public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
+        final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
         return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + RTREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
@@ -82,9 +76,9 @@
     @Override
     public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException {
         List<LSMComponentFileReferences> validFiles = new ArrayList<>();
-        ArrayList<ComparableFileName> allRTreeFiles = new ArrayList<>();
-        ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<>();
-        ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<>();
+        ArrayList<IndexComponentFileReference> allRTreeFiles = new ArrayList<>();
+        ArrayList<IndexComponentFileReference> allBTreeFiles = new ArrayList<>();
+        ArrayList<IndexComponentFileReference> allBloomFilterFiles = new ArrayList<>();
 
         // Create a transaction filter <- to hide transaction components->
         FilenameFilter transactionFilter = getTransactionFileFilter(false);
@@ -93,9 +87,8 @@
         cleanupAndGetValidFilesInternal(getCompoundFilter(transactionFilter, btreeFilter), btreeFactory, allBTreeFiles,
                 btreeFactory.getBufferCache());
         HashSet<String> btreeFilesSet = new HashSet<>();
-        for (ComparableFileName cmpFileName : allBTreeFiles) {
-            int index = cmpFileName.fileName.lastIndexOf(DELIMITER);
-            btreeFilesSet.add(cmpFileName.fileName.substring(0, index));
+        for (IndexComponentFileReference cmpFileName : allBTreeFiles) {
+            btreeFilesSet.add(cmpFileName.getSequence());
         }
         validateFiles(btreeFilesSet, allRTreeFiles, getCompoundFilter(transactionFilter, rtreeFilter), rtreeFactory,
                 btreeFactory.getBufferCache());
@@ -113,52 +106,47 @@
         }
 
         if (allRTreeFiles.size() == 1 && allBTreeFiles.size() == 1 && allBloomFilterFiles.size() == 1) {
-            validFiles.add(new LSMComponentFileReferences(allRTreeFiles.get(0).fileRef, allBTreeFiles.get(0).fileRef,
-                    allBloomFilterFiles.get(0).fileRef));
+            validFiles.add(new LSMComponentFileReferences(allRTreeFiles.get(0).getFileRef(),
+                    allBTreeFiles.get(0).getFileRef(), allBloomFilterFiles.get(0).getFileRef()));
             return validFiles;
         }
 
-        // Sorts files names from earliest to latest timestamp.
+        // Sorts files names from earliest to latest sequence.
         Collections.sort(allRTreeFiles);
         Collections.sort(allBTreeFiles);
         Collections.sort(allBloomFilterFiles);
 
-        List<ComparableFileName> validComparableRTreeFiles = new ArrayList<>();
-        ComparableFileName lastRTree = allRTreeFiles.get(0);
+        List<IndexComponentFileReference> validComparableRTreeFiles = new ArrayList<>();
+        IndexComponentFileReference lastRTree = allRTreeFiles.get(0);
         validComparableRTreeFiles.add(lastRTree);
 
-        List<ComparableFileName> validComparableBTreeFiles = new ArrayList<>();
-        ComparableFileName lastBTree = allBTreeFiles.get(0);
+        List<IndexComponentFileReference> validComparableBTreeFiles = new ArrayList<>();
+        IndexComponentFileReference lastBTree = allBTreeFiles.get(0);
         validComparableBTreeFiles.add(lastBTree);
 
-        List<ComparableFileName> validComparableBloomFilterFiles = new ArrayList<>();
-        ComparableFileName lastBloomFilter = allBloomFilterFiles.get(0);
+        List<IndexComponentFileReference> validComparableBloomFilterFiles = new ArrayList<>();
+        IndexComponentFileReference lastBloomFilter = allBloomFilterFiles.get(0);
         validComparableBloomFilterFiles.add(lastBloomFilter);
 
         for (int i = 1; i < allRTreeFiles.size(); i++) {
-            ComparableFileName currentRTree = allRTreeFiles.get(i);
-            ComparableFileName currentBTree = allBTreeFiles.get(i);
-            ComparableFileName currentBloomFilter = allBloomFilterFiles.get(i);
-            // Current start timestamp is greater than last stop timestamp.
-            if (currentRTree.interval[0].compareTo(lastRTree.interval[1]) > 0
-                    && currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0
-                    && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0) {
+            IndexComponentFileReference currentRTree = allRTreeFiles.get(i);
+            IndexComponentFileReference currentBTree = allBTreeFiles.get(i);
+            IndexComponentFileReference currentBloomFilter = allBloomFilterFiles.get(i);
+            // Current start sequence is greater than last stop sequence.
+            if (currentRTree.isMoreRecentThan(lastRTree) && currentBTree.isMoreRecentThan(lastBTree)
+                    && currentBloomFilter.isMoreRecentThan(lastBloomFilter)) {
                 validComparableRTreeFiles.add(currentRTree);
                 validComparableBTreeFiles.add(currentBTree);
                 validComparableBloomFilterFiles.add(currentBloomFilter);
                 lastRTree = currentRTree;
                 lastBTree = currentBTree;
                 lastBloomFilter = currentBloomFilter;
-            } else if (currentRTree.interval[0].compareTo(lastRTree.interval[0]) >= 0
-                    && currentRTree.interval[1].compareTo(lastRTree.interval[1]) <= 0
-                    && currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0
-                    && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0
-                    && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0
-                    && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0) {
-                // Invalid files are completely contained in last interval.
-                delete(treeFactory.getBufferCache(), currentRTree.fullPath);
-                delete(treeFactory.getBufferCache(), currentBTree.fullPath);
-                delete(treeFactory.getBufferCache(), currentBloomFilter.fullPath);
+            } else if (currentRTree.isWithin(lastRTree) && currentBTree.isWithin(lastBTree)
+                    && currentBloomFilter.isWithin(lastBloomFilter)) {
+                // Invalid files are completely contained in last sequence.
+                delete(treeFactory.getBufferCache(), currentRTree.getFullPath());
+                delete(treeFactory.getBufferCache(), currentBTree.getFullPath());
+                delete(treeFactory.getBufferCache(), currentBloomFilter.getFullPath());
             } else {
                 // This scenario should not be possible.
                 throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir);
@@ -167,29 +155,28 @@
 
         // Sort valid files in reverse lexicographical order, such that newer
         // files come first.
-        Collections.sort(validComparableRTreeFiles, recencyCmp);
-        Collections.sort(validComparableBTreeFiles, recencyCmp);
-        Collections.sort(validComparableBloomFilterFiles, recencyCmp);
+        validComparableRTreeFiles.sort(recencyCmp);
+        validComparableBTreeFiles.sort(recencyCmp);
+        validComparableBloomFilterFiles.sort(recencyCmp);
 
-        Iterator<ComparableFileName> rtreeFileIter = validComparableRTreeFiles.iterator();
-        Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator();
-        Iterator<ComparableFileName> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
+        Iterator<IndexComponentFileReference> rtreeFileIter = validComparableRTreeFiles.iterator();
+        Iterator<IndexComponentFileReference> btreeFileIter = validComparableBTreeFiles.iterator();
+        Iterator<IndexComponentFileReference> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
         while (rtreeFileIter.hasNext() && btreeFileIter.hasNext()) {
-            ComparableFileName cmpRTreeFileName = rtreeFileIter.next();
-            ComparableFileName cmpBTreeFileName = btreeFileIter.next();
-            ComparableFileName cmpBloomFilterFileName = bloomFilterFileIter.next();
-            validFiles.add(new LSMComponentFileReferences(cmpRTreeFileName.fileRef, cmpBTreeFileName.fileRef,
-                    cmpBloomFilterFileName.fileRef));
+            IndexComponentFileReference cmpRTreeFileName = rtreeFileIter.next();
+            IndexComponentFileReference cmpBTreeFileName = btreeFileIter.next();
+            IndexComponentFileReference cmpBloomFilterFileName = bloomFilterFileIter.next();
+            validFiles.add(new LSMComponentFileReferences(cmpRTreeFileName.getFileRef(), cmpBTreeFileName.getFileRef(),
+                    cmpBloomFilterFileName.getFileRef()));
         }
         return validFiles;
     }
 
     @Override
     public LSMComponentFileReferences getNewTransactionFileReference() throws IOException {
-        String ts = getCurrentTimestamp();
+        String baseName = getNextComponentSequence(btreeFilter);
         // Create transaction lock file
-        Files.createFile(Paths.get(baseDir + TXN_PREFIX + ts));
-        String baseName = ts + DELIMITER + ts;
+        Files.createFile(Paths.get(baseDir + TXN_PREFIX + baseName));
         return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + RTREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
                 baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java
index 8ecbcc4..c255ee5 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java
@@ -22,16 +22,19 @@
 import java.io.FilenameFilter;
 import java.util.ArrayList;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
 import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 
 public class TestLsmIndexFileManager extends AbstractLSMIndexFileManager {
 
+    private long componentSeq = 0;
+
     public TestLsmIndexFileManager(IIOManager ioManager, FileReference file,
             TreeIndexFactory<? extends ITreeIndex> treeIndexFactory) {
         super(ioManager, file, treeIndexFactory);
@@ -39,12 +42,18 @@
 
     @Override
     protected void cleanupAndGetValidFilesInternal(FilenameFilter filter,
-            TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles,
-            IBufferCache bufferCache) throws HyracksDataException {
+            TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<IndexComponentFileReference> allFiles,
+            IBufferCache bufferCache) {
         String[] files = baseDir.getFile().list(filter);
         for (String fileName : files) {
             FileReference fileRef = baseDir.getChild(fileName);
-            allFiles.add(new ComparableFileName(fileRef));
+            allFiles.add(IndexComponentFileReference.of(fileRef));
         }
     }
+
+    @Override
+    public LSMComponentFileReferences getRelFlushFileReference() {
+        String sequence = IndexComponentFileReference.getFlushSequence(componentSeq++);
+        return new LSMComponentFileReferences(baseDir.getChild(sequence), null, null);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
index e75a961..183cb6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
@@ -29,6 +29,10 @@
         spanNanos = unit.toNanos(span);
     }
 
+    public long getSpanNanos() {
+        return spanNanos;
+    }
+
     public static Span start(long span, TimeUnit unit) {
         return new Span(span, unit);
     }