diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index 50fa257..392bec8 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -22,12 +22,12 @@
 
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class ActiveManagerMessage extends AbstractApplicationMessage {
+public class ActiveManagerMessage implements IApplicationMessage {
     public static final byte STOP_ACTIVITY = 0x00;
 
     private static final long serialVersionUID = 1L;
@@ -62,7 +62,7 @@
     }
 
     @Override
-    public String type() {
-        return "ACTIVE_MANAGER_MESSAGE";
+    public String toString() {
+        return ActiveManagerMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index 02affc4..fc67d3c 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -22,12 +22,12 @@
 
 import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.service.IControllerService;
 
-public class ActivePartitionMessage extends AbstractApplicationMessage {
+public class ActivePartitionMessage implements IApplicationMessage {
 
     public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00;
     public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01;
@@ -70,7 +70,7 @@
     }
 
     @Override
-    public String type() {
-        return "ACTIVE_ENTITY_TO_CC_MESSAGE";
+    public String toString() {
+        return ActivePartitionMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
index 265025f..ec0aaa8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
@@ -30,7 +30,6 @@
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 public class AsterixAppRuntimeContextProviderForRecovery implements IAsterixAppRuntimeContextProvider {
 
@@ -76,11 +75,6 @@
     }
 
     @Override
-    public IResourceIdFactory getResourceIdFactory() {
-        return asterixAppRuntimeContext.getResourceIdFactory();
-    }
-
-    @Override
     public IIOManager getIOManager() {
         return asterixAppRuntimeContext.getIOManager();
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
index 9127ee5..ce20c9c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
@@ -60,7 +60,6 @@
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataNode;
@@ -73,6 +72,7 @@
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider;
 import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.hyracks.api.application.IApplicationConfig;
@@ -105,6 +105,7 @@
 
     private ILSMMergePolicyFactory metadataMergePolicyFactory;
     private final INCApplicationContext ncApplicationContext;
+    private final IResourceIdFactory resourceIdFactory;
 
     private AsterixCompilerProperties compilerProperties;
     private AsterixExternalProperties externalProperties;
@@ -124,7 +125,6 @@
 
     private ILSMIOOperationScheduler lsmIOScheduler;
     private PersistentLocalResourceRepository localResourceRepository;
-    private IResourceIdFactory resourceIdFactory;
     private IIOManager ioManager;
     private boolean isShuttingdown;
 
@@ -171,6 +171,7 @@
         }
         allExtensions.addAll(new AsterixExtensionProperties(propertiesAccessor).getExtensions());
         ncExtensionManager = new NCExtensionManager(allExtensions);
+        resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
     }
 
     @Override
@@ -193,7 +194,7 @@
 
         ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory =
                 new PersistentLocalResourceRepositoryFactory(
-                ioManager, ncApplicationContext.getNodeId(), metadataProperties);
+                        ioManager, ncApplicationContext.getNodeId(), metadataProperties);
 
         localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory
                 .createRepository();
@@ -209,7 +210,6 @@
             //delete any storage data before the resource factory is initialized
             localResourceRepository.deleteStorageData(true);
         }
-        initializeResourceIdFactory();
 
         datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
                 MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager(),
@@ -290,6 +290,7 @@
         lccm.register((ILifeCycleComponent) datasetLifecycleManager);
         lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
         lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
+
     }
 
     @Override
@@ -442,11 +443,6 @@
     }
 
     @Override
-    public void initializeResourceIdFactory() throws HyracksDataException {
-        resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
-    }
-
-    @Override
     public void initializeMetadata(boolean newUniverse) throws Exception {
         IAsterixStateProxy proxy;
         if (LOGGER.isLoggable(Level.INFO)) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 1345aa7..d1d7ff7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -22,7 +22,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.hyracks.api.messages.IMessage;
@@ -41,9 +40,9 @@
 
     @Override
     public void receivedMessage(IMessage message, String nodeId) throws Exception {
-        AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
+        IApplicationMessage absMessage = (IApplicationMessage) message;
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Received message: " + absMessage.type());
+            LOGGER.info("Received message: " + absMessage);
         }
         absMessage.handle(ccs);
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 9851b61..1beff82 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -20,19 +20,14 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.config.MessagingProperties;
 import org.apache.asterix.common.memory.ConcurrentFramePool;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -44,8 +39,6 @@
     private static final Logger LOGGER = Logger.getLogger(NCMessageBroker.class.getName());
 
     private final NodeControllerService ncs;
-    private final AtomicLong messageId = new AtomicLong(0);
-    private final Map<Long, IApplicationMessageCallback> callbacks;
     private final IAsterixAppRuntimeContext appContext;
     private final LinkedBlockingQueue<IApplicationMessage> receivedMsgsQ;
     private final ConcurrentFramePool messagingFramePool;
@@ -54,7 +47,6 @@
     public NCMessageBroker(NodeControllerService ncs, MessagingProperties messagingProperties) {
         this.ncs = ncs;
         appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
-        callbacks = new ConcurrentHashMap<>();
         maxMsgSize = messagingProperties.getFrameSize();
         int messagingMemoryBudget = messagingProperties.getFrameSize() * messagingProperties.getFrameCount();
         messagingFramePool = new ConcurrentFramePool(ncs.getId(), messagingMemoryBudget,
@@ -65,27 +57,15 @@
     }
 
     @Override
-    public void sendMessageToCC(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception {
-        registerMsgCallback(message, callback);
-        try {
-            ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
-        } catch (Exception e) {
-            handleMsgDeliveryFailure(message);
-            throw e;
-        }
+    public void sendMessageToCC(IApplicationMessage message) throws Exception {
+        ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
     }
 
     @Override
-    public void sendMessageToNC(String nodeId, IApplicationMessage message, IApplicationMessageCallback callback)
+    public void sendMessageToNC(String nodeId, IApplicationMessage message)
             throws Exception {
-        registerMsgCallback(message, callback);
-        try {
-            IChannelControlBlock messagingChannel = ncs.getMessagingNetworkManager().getMessagingChannel(nodeId);
-            sendMessageToChannel(messagingChannel, message);
-        } catch (Exception e) {
-            handleMsgDeliveryFailure(message);
-            throw e;
-        }
+        IChannelControlBlock messagingChannel = ncs.getMessagingNetworkManager().getMessagingChannel(nodeId);
+        sendMessageToChannel(messagingChannel, message);
     }
 
     @Override
@@ -95,14 +75,9 @@
 
     @Override
     public void receivedMessage(IMessage message, String nodeId) throws Exception {
-        AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
+        IApplicationMessage absMessage = (IApplicationMessage) message;
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Received message: " + absMessage.type());
-        }
-        //if the received message is a response to a sent message, deliver it to the sender
-        IApplicationMessageCallback callback = callbacks.remove(absMessage.getId());
-        if (callback != null) {
-            callback.deliverMessageResponse(absMessage);
+            LOGGER.info("Received message: " + absMessage);
         }
         absMessage.handle(ncs);
     }
@@ -111,18 +86,6 @@
         return messagingFramePool;
     }
 
-    private void registerMsgCallback(IApplicationMessage message, IApplicationMessageCallback callback) {
-        if (callback != null) {
-            long uniqueMessageId = messageId.incrementAndGet();
-            message.setId(uniqueMessageId);
-            callbacks.put(uniqueMessageId, callback);
-        }
-    }
-
-    private void handleMsgDeliveryFailure(IApplicationMessage message) {
-        callbacks.remove(message.getId());
-    }
-
     private void sendMessageToChannel(IChannelControlBlock ccb, IApplicationMessage msg) throws IOException {
         byte[] serializedMsg = JavaSerializationUtils.serialize(msg);
         if (serializedMsg.length > maxMsgSize) {
@@ -161,8 +124,8 @@
                     Thread.currentThread().interrupt();
                 } catch (Exception e) {
                     if (LOGGER.isLoggable(Level.WARNING) && msg != null) {
-                        LOGGER.log(Level.WARNING, "Could not process message with id: " + msg.getId() + " and type: "
-                                + msg.type(), e);
+                        LOGGER.log(Level.WARNING, "Could not process message : "
+                                + msg, e);
                     } else {
                         if (LOGGER.isLoggable(Level.WARNING)) {
                             LOGGER.log(Level.WARNING, "Could not process message", e);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 046d5c1..c009152 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -86,8 +86,6 @@
 
     public ILibraryManager getLibraryManager();
 
-    public void initializeResourceIdFactory() throws HyracksDataException;
-
     /**
      * Exports the metadata node to the metadata RMI port.
      *
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 552ce22..04e30ff 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -20,22 +20,22 @@
 
 import java.util.List;
 
-import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
-import org.apache.asterix.common.context.DatasetLifecycleManager.IndexInfo;
+import org.apache.asterix.common.context.DatasetInfo;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IIndex;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 
-public interface IDatasetLifecycleManager extends IIndexLifecycleManager {
+public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IIndex> {
     /**
-     * @param datasetID
-     * @param resourceID
+     * @param datasetId
+     * @param indexId
      * @return The corresponding index, or null if it is not found in the registered indexes.
      * @throws HyracksDataException
      */
-    IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException;
+    IIndex getIndex(int datasetId, long indexId) throws HyracksDataException;
 
     /**
      * Flushes all open datasets synchronously.
@@ -75,7 +75,7 @@
      * @param datasetID
      * @return
      */
-    ILSMOperationTracker getOperationTracker(int datasetID);
+    PrimaryIndexOperationTracker getOperationTracker(int datasetID);
 
     /**
      * creates (if necessary) and returns the dataset virtual buffer caches.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 21500b7..5c1d094 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.common.context;
 
-import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index 3d112ef..70339f3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -27,7 +27,7 @@
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -45,7 +45,7 @@
     private final IDatasetLifecycleManager datasetLifecycleManager;
     private final int datasetID;
 
-    public CorrelatedPrefixMergePolicy(IIndexLifecycleManager datasetLifecycleManager, int datasetID) {
+    public CorrelatedPrefixMergePolicy(IResourceLifecycleManager datasetLifecycleManager, int datasetID) {
         this.datasetLifecycleManager = (DatasetLifecycleManager) datasetLifecycleManager;
         this.datasetID = datasetID;
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
index ce405fc..3b65123 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
@@ -27,7 +27,7 @@
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
@@ -61,7 +61,7 @@
     }
 
     @Override
-    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IIndexLifecycleManager ilcm) {
+    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IResourceLifecycleManager ilcm) {
         ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(ilcm, datasetID);
         policy.configure(properties);
         return policy;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
new file mode 100644
index 0000000..6a2cc56
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
+    private final Map<Long, IndexInfo> indexes;
+    private final int datasetID;
+    private long lastAccess;
+    private int numActiveIOOps;
+    private boolean isExternal;
+    private boolean isRegistered;
+    private boolean memoryAllocated;
+    private boolean durable;
+
+    public DatasetInfo(int datasetID) {
+        this.indexes = new HashMap<>();
+        this.setLastAccess(-1);
+        this.datasetID = datasetID;
+        this.setRegistered(false);
+        this.setMemoryAllocated(false);
+    }
+
+    @Override
+    public void touch() {
+        super.touch();
+        setLastAccess(System.currentTimeMillis());
+    }
+
+    @Override
+    public void untouch() {
+        super.untouch();
+        setLastAccess(System.currentTimeMillis());
+    }
+
+    public synchronized void declareActiveIOOperation() {
+        setNumActiveIOOps(getNumActiveIOOps() + 1);
+    }
+
+    public synchronized void undeclareActiveIOOperation() {
+        setNumActiveIOOps(getNumActiveIOOps() - 1);
+        //notify threads waiting on this dataset info
+        notifyAll();
+    }
+
+    public synchronized Set<ILSMIndex> getDatasetIndexes() {
+        Set<ILSMIndex> datasetIndexes = new HashSet<>();
+        for (IndexInfo iInfo : getIndexes().values()) {
+            if (iInfo.isOpen()) {
+                datasetIndexes.add(iInfo.getIndex());
+            }
+        }
+
+        return datasetIndexes;
+    }
+
+    @Override
+    public int compareTo(DatasetInfo i) {
+        // sort by (isOpen, referenceCount, lastAccess) ascending, where true < false
+        //
+        // Example sort order:
+        // -------------------
+        // (F, 0, 70)       <-- largest
+        // (F, 0, 60)
+        // (T, 10, 80)
+        // (T, 10, 70)
+        // (T, 9, 90)
+        // (T, 0, 100)      <-- smallest
+        if (isOpen() && !i.isOpen()) {
+            return -1;
+        } else if (!isOpen() && i.isOpen()) {
+            return 1;
+        } else {
+            if (getReferenceCount() < i.getReferenceCount()) {
+                return -1;
+            } else if (getReferenceCount() > i.getReferenceCount()) {
+                return 1;
+            } else {
+                if (getLastAccess() < i.getLastAccess()) {
+                    return -1;
+                } else if (getLastAccess() > i.getLastAccess()) {
+                    return 1;
+                } else {
+                    return 0;
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof DatasetInfo) {
+            return datasetID == ((DatasetInfo) obj).datasetID;
+        }
+        return false;
+    };
+
+    @Override
+    public int hashCode() {
+        return datasetID;
+    }
+
+    @Override
+    public String toString() {
+        return "DatasetID: " + getDatasetID() + ", isOpen: " + isOpen() + ", refCount: " + getReferenceCount()
+                + ", lastAccess: " + getLastAccess() + ", isRegistered: " + isRegistered() + ", memoryAllocated: "
+                + isMemoryAllocated() + ", isDurable: " + isDurable();
+    }
+
+    public boolean isDurable() {
+        return durable;
+    }
+
+    public int getNumActiveIOOps() {
+        return numActiveIOOps;
+    }
+
+    public void setNumActiveIOOps(int numActiveIOOps) {
+        this.numActiveIOOps = numActiveIOOps;
+    }
+
+    public boolean isExternal() {
+        return isExternal;
+    }
+
+    public void setExternal(boolean isExternal) {
+        this.isExternal = isExternal;
+    }
+
+    public Map<Long, IndexInfo> getIndexes() {
+        return indexes;
+    }
+
+    public boolean isRegistered() {
+        return isRegistered;
+    }
+
+    public void setRegistered(boolean isRegistered) {
+        this.isRegistered = isRegistered;
+    }
+
+    public void setDurable(boolean durable) {
+        this.durable = durable;
+    }
+
+    public int getDatasetID() {
+        return datasetID;
+    }
+
+    public boolean isMemoryAllocated() {
+        return memoryAllocated;
+    }
+
+    public void setMemoryAllocated(boolean memoryAllocated) {
+        this.memoryAllocated = memoryAllocated;
+    }
+
+    public long getLastAccess() {
+        return lastAccess;
+    }
+
+    public void setLastAccess(long lastAccess) {
+        this.lastAccess = lastAccess;
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index f4eec05..698dfb4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -22,11 +22,9 @@
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
@@ -40,22 +38,16 @@
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
-import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
-import org.apache.hyracks.storage.common.buffercache.ResourceHeapBufferAllocator;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.file.LocalResource;
 
 public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent {
+    private final Map<Integer, DatasetResource> datasets = new ConcurrentHashMap<>();
     private final AsterixStorageProperties storageProperties;
-    private final Map<Integer, DatasetVirtualBufferCaches> datasetVirtualBufferCachesMap;
-    private final Map<Integer, ILSMOperationTracker> datasetOpTrackers;
-    private final Map<Integer, DatasetInfo> datasetInfos;
     private final ILocalResourceRepository resourceRepository;
     private final int firstAvilableUserDatasetID;
     private final long capacity;
@@ -63,7 +55,7 @@
     private final ILogManager logManager;
     private final LogRecord logRecord;
     private final int numPartitions;
-    private boolean stopped = false;
+    private volatile boolean stopped = false;
 
     public DatasetLifecycleManager(AsterixStorageProperties storageProperties,
             ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID, ILogManager logManager,
@@ -73,16 +65,13 @@
         this.resourceRepository = resourceRepository;
         this.firstAvilableUserDatasetID = firstAvilableUserDatasetID;
         this.numPartitions = numPartitions;
-        datasetVirtualBufferCachesMap = new HashMap<>();
-        datasetOpTrackers = new HashMap<Integer, ILSMOperationTracker>();
-        datasetInfos = new HashMap<Integer, DatasetInfo>();
         capacity = storageProperties.getMemoryComponentGlobalBudget();
         used = 0;
         logRecord = new LogRecord();
     }
 
     @Override
-    public synchronized IIndex getIndex(String resourcePath) throws HyracksDataException {
+    public synchronized IIndex get(String resourcePath) throws HyracksDataException {
         validateDatasetLifecycleManagerState();
         int datasetID = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
@@ -92,15 +81,11 @@
     @Override
     public synchronized IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
         validateDatasetLifecycleManagerState();
-        DatasetInfo dsInfo = datasetInfos.get(datasetID);
-        if (dsInfo == null) {
+        DatasetResource datasetResource = datasets.get(datasetID);
+        if (datasetResource == null) {
             return null;
         }
-        IndexInfo iInfo = dsInfo.indexes.get(resourceID);
-        if (iInfo == null) {
-            return null;
-        }
-        return iInfo.index;
+        return datasetResource.getIndex(resourceID);
     }
 
     @Override
@@ -108,20 +93,11 @@
         validateDatasetLifecycleManagerState();
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
-        DatasetInfo dsInfo = datasetInfos.get(did);
-        if (dsInfo == null) {
-            dsInfo = getDatasetInfo(did);
+        DatasetResource datasetResource = datasets.get(did);
+        if (datasetResource == null) {
+            datasetResource = getDatasetLifecycle(did);
         }
-        if (!dsInfo.isRegistered) {
-            dsInfo.isExternal = !index.hasMemoryComponents();
-            dsInfo.isRegistered = true;
-            dsInfo.durable = ((ILSMIndex) index).isDurable();
-        }
-
-        if (dsInfo.indexes.containsKey(resourceID)) {
-            throw new HyracksDataException("Index with resource ID " + resourceID + " already exists.");
-        }
-        dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index, dsInfo.datasetID, resourceID));
+        datasetResource.register(resourceID, index);
     }
 
     public int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
@@ -146,24 +122,25 @@
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
 
-        DatasetInfo dsInfo = datasetInfos.get(did);
-        IndexInfo iInfo = dsInfo == null ? null : dsInfo.indexes.get(resourceID);
+        DatasetResource dsr = datasets.get(did);
+        IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
 
-        if (dsInfo == null || iInfo == null) {
+        if (dsr == null || iInfo == null) {
             throw new HyracksDataException("Index with resource ID " + resourceID + " does not exist.");
         }
 
-        PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) datasetOpTrackers.get(dsInfo.datasetID);
-        if (iInfo.referenceCount != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+        PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
+        if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
             throw new HyracksDataException("Cannot remove index while it is open. (Dataset reference count = "
-                    + iInfo.referenceCount + ", Operation tracker number of active operations = "
+                    + iInfo.getReferenceCount() + ", Operation tracker number of active operations = "
                     + opTracker.getNumActiveOperations() + ")");
         }
 
         // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
         // First wait for any ongoing IO operations
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
         synchronized (dsInfo) {
-            while (dsInfo.numActiveIOOps > 0) {
+            while (dsInfo.getNumActiveIOOps() > 0) {
                 try {
                     //notification will come from DatasetInfo class (undeclareActiveIOOperation)
                     dsInfo.wait();
@@ -173,16 +150,17 @@
             }
         }
 
-        if (iInfo.isOpen) {
-            ILSMOperationTracker indexOpTracker = iInfo.index.getOperationTracker();
+        if (iInfo.isOpen()) {
+            ILSMOperationTracker indexOpTracker = iInfo.getIndex().getOperationTracker();
             synchronized (indexOpTracker) {
-                iInfo.index.deactivate(false);
+                iInfo.getIndex().deactivate(false);
             }
         }
 
-        dsInfo.indexes.remove(resourceID);
-        if (dsInfo.referenceCount == 0 && dsInfo.isOpen && dsInfo.indexes.isEmpty() && !dsInfo.isExternal) {
-            removeDatasetFromCache(dsInfo.datasetID);
+        dsInfo.getIndexes().remove(resourceID);
+        if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
+                && !dsInfo.isExternal()) {
+            removeDatasetFromCache(dsInfo.getDatasetID());
         }
     }
 
@@ -192,29 +170,28 @@
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
 
-        DatasetInfo dsInfo = datasetInfos.get(did);
-        if (dsInfo == null || !dsInfo.isRegistered) {
+        DatasetResource dsr = datasets.get(did);
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
+        if (dsInfo == null || !dsInfo.isRegistered()) {
             throw new HyracksDataException(
                     "Failed to open index with resource ID " + resourceID + " since it does not exist.");
         }
 
-        IndexInfo iInfo = dsInfo.indexes.get(resourceID);
+        IndexInfo iInfo = dsInfo.getIndexes().get(resourceID);
         if (iInfo == null) {
             throw new HyracksDataException(
                     "Failed to open index with resource ID " + resourceID + " since it does not exist.");
         }
-        if (!dsInfo.isOpen && !dsInfo.isExternal) {
-            initializeDatasetVirtualBufferCache(did);
-        }
 
-        dsInfo.isOpen = true;
-        dsInfo.touch();
-        if (!iInfo.isOpen) {
-            ILSMOperationTracker opTracker = iInfo.index.getOperationTracker();
+        dsr.open(true);
+        dsr.touch();
+
+        if (!iInfo.isOpen()) {
+            ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
             synchronized (opTracker) {
-                iInfo.index.activate();
+                iInfo.getIndex().activate();
             }
-            iInfo.isOpen = true;
+            iInfo.setOpen(true);
         }
         iInfo.touch();
     }
@@ -225,14 +202,15 @@
          * that is not being used (refcount == 0) and has been least recently used, excluding metadata datasets.
          * The sort order defined for DatasetInfo maintains this. See DatasetInfo.compareTo().
          */
-        List<DatasetInfo> datasetInfosList = new ArrayList<DatasetInfo>(datasetInfos.values());
-        Collections.sort(datasetInfosList);
-        for (DatasetInfo dsInfo : datasetInfosList) {
-            PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) datasetOpTrackers
-                    .get(dsInfo.datasetID);
-            if (opTracker != null && opTracker.getNumActiveOperations() == 0 && dsInfo.referenceCount == 0
-                    && dsInfo.isOpen && dsInfo.datasetID >= firstAvilableUserDatasetID) {
-                closeDataset(dsInfo);
+        List<DatasetResource> datasetsResources = new ArrayList<>(datasets.values());
+        Collections.sort(datasetsResources);
+        for (DatasetResource dsr : datasetsResources) {
+            PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
+            if (opTracker != null && opTracker.getNumActiveOperations() == 0
+                    && dsr.getDatasetInfo().getReferenceCount() == 0
+                    && dsr.getDatasetInfo().isOpen()
+                    && dsr.getDatasetInfo().getDatasetID() >= getFirstAvilableUserDatasetID()) {
+                closeDataset(dsr.getDatasetInfo());
                 return true;
             }
         }
@@ -240,15 +218,15 @@
     }
 
     private static void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException {
-        if (iInfo.isOpen) {
-            ILSMIndexAccessor accessor = iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
+        if (iInfo.isOpen()) {
+            ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
-            accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
+            accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
         }
 
         // Wait for the above flush op.
         synchronized (dsInfo) {
-            while (dsInfo.numActiveIOOps > 0) {
+            while (dsInfo.getNumActiveIOOps() > 0) {
                 try {
                     //notification will come from DatasetInfo class (undeclareActiveIOOperation)
                     dsInfo.wait();
@@ -259,16 +237,29 @@
         }
     }
 
+    public DatasetResource getDatasetLifecycle(int did) {
+        DatasetResource dsr = datasets.get(did);
+        if (dsr != null) {
+            return dsr;
+        }
+        synchronized (datasets) {
+            dsr = datasets.get(did);
+            if (dsr == null) {
+                DatasetInfo dsInfo = new DatasetInfo(did);
+                PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(did, logManager, dsInfo);
+                DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties,
+                        getFirstAvilableUserDatasetID(),
+                        getNumPartitions());
+                dsr = new DatasetResource(dsInfo, opTracker, vbcs);
+                datasets.put(did, dsr);
+            }
+            return dsr;
+        }
+    }
+
     @Override
     public DatasetInfo getDatasetInfo(int datasetID) {
-        synchronized (datasetInfos) {
-            DatasetInfo dsInfo = datasetInfos.get(datasetID);
-            if (dsInfo == null) {
-                dsInfo = new DatasetInfo(datasetID);
-                datasetInfos.put(datasetID, dsInfo);
-            }
-            return dsInfo;
-        }
+        return getDatasetLifecycle(datasetID).getDatasetInfo();
     }
 
     @Override
@@ -276,35 +267,34 @@
         validateDatasetLifecycleManagerState();
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
-
-        DatasetInfo dsInfo = datasetInfos.get(did);
-        if (dsInfo == null) {
+        DatasetResource dsr = datasets.get(did);
+        if (dsr == null) {
             throw new HyracksDataException("No index found with resourceID " + resourceID);
         }
-        IndexInfo iInfo = dsInfo.indexes.get(resourceID);
+        IndexInfo iInfo = dsr.getIndexInfo(resourceID);
         if (iInfo == null) {
             throw new HyracksDataException("No index found with resourceID " + resourceID);
         }
         iInfo.untouch();
-        dsInfo.untouch();
+        dsr.untouch();
     }
 
     @Override
-    public synchronized List<IIndex> getOpenIndexes() {
+    public synchronized List<IIndex> getOpenResources() {
         List<IndexInfo> openIndexesInfo = getOpenIndexesInfo();
         List<IIndex> openIndexes = new ArrayList<IIndex>();
         for (IndexInfo iInfo : openIndexesInfo) {
-            openIndexes.add(iInfo.index);
+            openIndexes.add(iInfo.getIndex());
         }
         return openIndexes;
     }
 
     @Override
     public synchronized List<IndexInfo> getOpenIndexesInfo() {
-        List<IndexInfo> openIndexesInfo = new ArrayList<IndexInfo>();
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
-            for (IndexInfo iInfo : dsInfo.indexes.values()) {
-                if (iInfo.isOpen) {
+        List<IndexInfo> openIndexesInfo = new ArrayList<>();
+        for (DatasetResource dsr : datasets.values()) {
+            for (IndexInfo iInfo : dsr.getIndexes().values()) {
+                if (iInfo.isOpen()) {
                     openIndexesInfo.add(iInfo);
                 }
             }
@@ -313,46 +303,23 @@
     }
 
     private DatasetVirtualBufferCaches getVirtualBufferCaches(int datasetID) {
-        synchronized (datasetVirtualBufferCachesMap) {
-            DatasetVirtualBufferCaches vbcs = datasetVirtualBufferCachesMap.get(datasetID);
-            if (vbcs == null) {
-                vbcs = initializeDatasetVirtualBufferCache(datasetID);
-            }
-            return vbcs;
-        }
+        return getDatasetLifecycle(datasetID).getVirtualBufferCaches();
     }
 
     @Override
     public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID, int ioDeviceNum) {
         DatasetVirtualBufferCaches dvbcs = getVirtualBufferCaches(datasetID);
-        return dvbcs.getVirtualBufferCaches(ioDeviceNum);
+        return dvbcs.getVirtualBufferCaches(this, ioDeviceNum);
     }
 
     private void removeDatasetFromCache(int datasetID) throws HyracksDataException {
         deallocateDatasetMemory(datasetID);
-        datasetInfos.remove(datasetID);
-        datasetVirtualBufferCachesMap.remove(datasetID);
-        datasetOpTrackers.remove(datasetID);
-    }
-
-    private DatasetVirtualBufferCaches initializeDatasetVirtualBufferCache(int datasetID) {
-        synchronized (datasetVirtualBufferCachesMap) {
-            DatasetVirtualBufferCaches dvbcs = new DatasetVirtualBufferCaches(datasetID);
-            datasetVirtualBufferCachesMap.put(datasetID, dvbcs);
-            return dvbcs;
-        }
+        datasets.remove(datasetID);
     }
 
     @Override
-    public ILSMOperationTracker getOperationTracker(int datasetID) {
-        synchronized (datasetOpTrackers) {
-            ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
-            if (opTracker == null) {
-                opTracker = new PrimaryIndexOperationTracker(datasetID, logManager, getDatasetInfo(datasetID));
-                datasetOpTrackers.put(datasetID, opTracker);
-            }
-            return opTracker;
-        }
+    public PrimaryIndexOperationTracker getOperationTracker(int datasetID) {
+        return datasets.get(datasetID).getOpTracker();
     }
 
     private void validateDatasetLifecycleManagerState() throws HyracksDataException {
@@ -361,145 +328,6 @@
         }
     }
 
-    private static abstract class Info {
-        protected int referenceCount;
-        protected boolean isOpen;
-
-        public Info() {
-            referenceCount = 0;
-            isOpen = false;
-        }
-
-        public void touch() {
-            ++referenceCount;
-        }
-
-        public void untouch() {
-            --referenceCount;
-        }
-    }
-
-    public static class IndexInfo extends Info {
-        private final ILSMIndex index;
-        private final long resourceId;
-        private final int datasetId;
-
-        public IndexInfo(ILSMIndex index, int datasetId, long resourceId) {
-            this.index = index;
-            this.datasetId = datasetId;
-            this.resourceId = resourceId;
-        }
-
-        public ILSMIndex getIndex() {
-            return index;
-        }
-
-        public long getResourceId() {
-            return resourceId;
-        }
-
-        public int getDatasetId() {
-            return datasetId;
-        }
-    }
-
-    public static class DatasetInfo extends Info implements Comparable<DatasetInfo> {
-        private final Map<Long, IndexInfo> indexes;
-        private final int datasetID;
-        private long lastAccess;
-        private int numActiveIOOps;
-        private boolean isExternal;
-        private boolean isRegistered;
-        private boolean memoryAllocated;
-        private boolean durable;
-
-        public DatasetInfo(int datasetID) {
-            this.indexes = new HashMap<Long, IndexInfo>();
-            this.lastAccess = -1;
-            this.datasetID = datasetID;
-            this.isRegistered = false;
-            this.memoryAllocated = false;
-        }
-
-        @Override
-        public void touch() {
-            super.touch();
-            lastAccess = System.currentTimeMillis();
-        }
-
-        @Override
-        public void untouch() {
-            super.untouch();
-            lastAccess = System.currentTimeMillis();
-        }
-
-        public synchronized void declareActiveIOOperation() {
-            numActiveIOOps++;
-        }
-
-        public synchronized void undeclareActiveIOOperation() {
-            numActiveIOOps--;
-            //notify threads waiting on this dataset info
-            notifyAll();
-        }
-
-        public synchronized Set<ILSMIndex> getDatasetIndexes() {
-            Set<ILSMIndex> datasetIndexes = new HashSet<ILSMIndex>();
-            for (IndexInfo iInfo : indexes.values()) {
-                if (iInfo.isOpen) {
-                    datasetIndexes.add(iInfo.index);
-                }
-            }
-
-            return datasetIndexes;
-        }
-
-        @Override
-        public int compareTo(DatasetInfo i) {
-            // sort by (isOpen, referenceCount, lastAccess) ascending, where true < false
-            //
-            // Example sort order:
-            // -------------------
-            // (F, 0, 70)       <-- largest
-            // (F, 0, 60)
-            // (T, 10, 80)
-            // (T, 10, 70)
-            // (T, 9, 90)
-            // (T, 0, 100)      <-- smallest
-            if (isOpen && !i.isOpen) {
-                return -1;
-            } else if (!isOpen && i.isOpen) {
-                return 1;
-            } else {
-                if (referenceCount < i.referenceCount) {
-                    return -1;
-                } else if (referenceCount > i.referenceCount) {
-                    return 1;
-                } else {
-                    if (lastAccess < i.lastAccess) {
-                        return -1;
-                    } else if (lastAccess > i.lastAccess) {
-                        return 1;
-                    } else {
-                        return 0;
-                    }
-                }
-            }
-
-        }
-
-        @Override
-        public String toString() {
-            return "DatasetID: " + datasetID + ", isOpen: " + isOpen + ", refCount: " + referenceCount
-                    + ", lastAccess: " + lastAccess + ", isRegistered: " + isRegistered + ", memoryAllocated: "
-                    + memoryAllocated + ", isDurable: " + durable;
-        }
-
-        public boolean isDurable() {
-            return durable;
-        }
-    }
-
     @Override
     public synchronized void start() {
         used = 0;
@@ -507,30 +335,29 @@
 
     @Override
     public synchronized void flushAllDatasets() throws HyracksDataException {
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
-            flushDatasetOpenIndexes(dsInfo, false);
+        for (DatasetResource dsr : datasets.values()) {
+            flushDatasetOpenIndexes(dsr.getDatasetInfo(), false);
         }
     }
 
     @Override
     public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
-        DatasetInfo datasetInfo = datasetInfos.get(datasetId);
-        if (datasetInfo != null) {
-            flushDatasetOpenIndexes(datasetInfo, asyncFlush);
+        DatasetResource dsr = datasets.get(datasetId);
+        if (dsr != null) {
+            flushDatasetOpenIndexes(dsr.getDatasetInfo(), asyncFlush);
         }
     }
 
     @Override
     public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException {
         //schedule flush for datasets with min LSN (Log Serial Number) < targetLSN
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
-            PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) getOperationTracker(
-                    dsInfo.datasetID);
+        for (DatasetResource dsr : datasets.values()) {
+            PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
             synchronized (opTracker) {
-                for (IndexInfo iInfo : dsInfo.indexes.values()) {
-                    AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.index
+                for (IndexInfo iInfo : dsr.getIndexes().values()) {
+                    AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex()
                             .getIOOperationCallback();
-                    if (!(((AbstractLSMIndex) iInfo.index).isCurrentMutableComponentEmpty()
+                    if (!(((AbstractLSMIndex) iInfo.getIndex()).isCurrentMutableComponentEmpty()
                             || ioCallback.hasPendingFlush() || opTracker.isFlushLogCreated()
                             || opTracker.isFlushOnExit())) {
                         long firstLSN = ioCallback.getFirstLSN();
@@ -552,10 +379,10 @@
      * This method can only be called asynchronously safely if we're sure no modify operation will take place until the flush is scheduled
      */
     private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException {
-        if (!dsInfo.isExternal && dsInfo.durable) {
+        if (!dsInfo.isExternal() && dsInfo.isDurable()) {
             synchronized (logRecord) {
-                TransactionUtil.formFlushLogRecord(logRecord, dsInfo.datasetID, null, logManager.getNodeId(),
-                        dsInfo.indexes.size());
+                TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null, logManager.getNodeId(),
+                        dsInfo.getIndexes().size());
                 try {
                     logManager.log(logRecord);
                 } catch (ACIDException e) {
@@ -569,22 +396,22 @@
                     throw new HyracksDataException(e);
                 }
             }
-            for (IndexInfo iInfo : dsInfo.indexes.values()) {
+            for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
                 //update resource lsn
-                AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) iInfo.index
+                AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex()
                         .getIOOperationCallback();
                 ioOpCallback.updateLastLSN(logRecord.getLSN());
             }
         }
 
         if (asyncFlush) {
-            for (IndexInfo iInfo : dsInfo.indexes.values()) {
-                ILSMIndexAccessor accessor = iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
+            for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
+                ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE,
                         NoOpOperationCallback.INSTANCE);
-                accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
+                accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
             }
         } else {
-            for (IndexInfo iInfo : dsInfo.indexes.values()) {
+            for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
                 // TODO: This is not efficient since we flush the indexes sequentially.
                 // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this
                 // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
@@ -596,7 +423,7 @@
     private void closeDataset(DatasetInfo dsInfo) throws HyracksDataException {
         // First wait for any ongoing IO operations
         synchronized (dsInfo) {
-            while (dsInfo.numActiveIOOps > 0) {
+            while (dsInfo.getNumActiveIOOps() > 0) {
                 try {
                     dsInfo.wait();
                 } catch (InterruptedException e) {
@@ -609,34 +436,33 @@
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
-        for (IndexInfo iInfo : dsInfo.indexes.values()) {
-            if (iInfo.isOpen) {
-                ILSMOperationTracker opTracker = iInfo.index.getOperationTracker();
+        for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
+            if (iInfo.isOpen()) {
+                ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
                 synchronized (opTracker) {
-                    iInfo.index.deactivate(false);
+                    iInfo.getIndex().deactivate(false);
                 }
-                iInfo.isOpen = false;
+                iInfo.setOpen(false);
             }
-            assert iInfo.referenceCount == 0;
         }
-        removeDatasetFromCache(dsInfo.datasetID);
-        dsInfo.isOpen = false;
+        removeDatasetFromCache(dsInfo.getDatasetID());
+        dsInfo.setOpen(false);
     }
 
     @Override
     public synchronized void closeAllDatasets() throws HyracksDataException {
-        List<DatasetInfo> openDatasets = new ArrayList<>(datasetInfos.values());
-        for (DatasetInfo dsInfo : openDatasets) {
-            closeDataset(dsInfo);
+        ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
+        for (DatasetResource dsr : openDatasets) {
+            closeDataset(dsr.getDatasetInfo());
         }
     }
 
     @Override
     public synchronized void closeUserDatasets() throws HyracksDataException {
-        List<DatasetInfo> openDatasets = new ArrayList<>(datasetInfos.values());
-        for (DatasetInfo dsInfo : openDatasets) {
-            if (dsInfo.datasetID >= firstAvilableUserDatasetID) {
-                closeDataset(dsInfo);
+        ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
+        for (DatasetResource dsr : openDatasets) {
+            if (dsr.getDatasetID() >= getFirstAvilableUserDatasetID()) {
+                closeDataset(dsr.getDatasetInfo());
             }
         }
     }
@@ -652,9 +478,7 @@
 
         closeAllDatasets();
 
-        datasetVirtualBufferCachesMap.clear();
-        datasetOpTrackers.clear();
-        datasetInfos.clear();
+        datasets.clear();
         stopped = true;
     }
 
@@ -673,57 +497,43 @@
 
         sb.append("[Datasets]\n");
         sb.append(String.format(dsHeaderFormat, "DatasetID", "Open", "Reference Count", "Last Access"));
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
+        for (DatasetResource dsr : datasets.values()) {
+            DatasetInfo dsInfo = dsr.getDatasetInfo();
             sb.append(
-                    String.format(dsFormat, dsInfo.datasetID, dsInfo.isOpen, dsInfo.referenceCount, dsInfo.lastAccess));
+                    String.format(dsFormat, dsInfo.getDatasetID(), dsInfo.isOpen(), dsInfo.getReferenceCount(),
+                            dsInfo.getLastAccess()));
         }
         sb.append("\n");
 
         sb.append("[Indexes]\n");
         sb.append(String.format(idxHeaderFormat, "DatasetID", "ResourceID", "Open", "Reference Count", "Index"));
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
-            for (Map.Entry<Long, IndexInfo> entry : dsInfo.indexes.entrySet()) {
+        for (DatasetResource dsr : datasets.values()) {
+            DatasetInfo dsInfo = dsr.getDatasetInfo();
+            for (Map.Entry<Long, IndexInfo> entry : dsInfo.getIndexes().entrySet()) {
                 IndexInfo iInfo = entry.getValue();
-                sb.append(String.format(idxFormat, dsInfo.datasetID, entry.getKey(), iInfo.isOpen, iInfo.referenceCount,
-                        iInfo.index));
+                sb.append(String.format(idxFormat, dsInfo.getDatasetID(), entry.getKey(), iInfo.isOpen(),
+                        iInfo.getReferenceCount(),
+                        iInfo.getIndex()));
             }
         }
-
         outputStream.write(sb.toString().getBytes());
     }
 
-    private synchronized void allocateDatasetMemory(int datasetId) throws HyracksDataException {
-        DatasetInfo dsInfo = datasetInfos.get(datasetId);
-        if (dsInfo == null) {
+    private synchronized void deallocateDatasetMemory(int datasetId) throws HyracksDataException {
+        DatasetResource dsr = datasets.get(datasetId);
+        if (dsr == null) {
             throw new HyracksDataException(
                     "Failed to allocate memory for dataset with ID " + datasetId + " since it is not open.");
         }
-        synchronized (dsInfo) {
-            // This is not needed for external datasets' indexes since they never use the virtual buffer cache.
-            if (!dsInfo.memoryAllocated && !dsInfo.isExternal) {
-                long additionalSize = getVirtualBufferCaches(dsInfo.datasetID).getTotalSize();
-                while (used + additionalSize > capacity) {
-                    if (!evictCandidateDataset()) {
-                        throw new HyracksDataException("Cannot allocate dataset " + dsInfo.datasetID
-                                + " memory since memory budget would be exceeded.");
-                    }
-                }
-                used += additionalSize;
-                dsInfo.memoryAllocated = true;
-            }
-        }
-    }
-
-    private synchronized void deallocateDatasetMemory(int datasetId) throws HyracksDataException {
-        DatasetInfo dsInfo = datasetInfos.get(datasetId);
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
         if (dsInfo == null) {
             throw new HyracksDataException(
                     "Failed to deallocate memory for dataset with ID " + datasetId + " since it is not open.");
         }
         synchronized (dsInfo) {
-            if (dsInfo.isOpen && dsInfo.memoryAllocated) {
-                used -= getVirtualBufferCaches(dsInfo.datasetID).getTotalSize();
-                dsInfo.memoryAllocated = false;
+            if (dsInfo.isOpen() && dsInfo.isMemoryAllocated()) {
+                used -= getVirtualBufferCaches(dsInfo.getDatasetID()).getTotalSize();
+                dsInfo.setMemoryAllocated(false);
             }
         }
     }
@@ -731,54 +541,34 @@
     @Override
     public synchronized void allocateMemory(String resourcePath) throws HyracksDataException {
         //a resource name in the case of DatasetLifecycleManager is a dataset id which is passed to the ResourceHeapBufferAllocator.
-        int did = Integer.parseInt(resourcePath);
-        allocateDatasetMemory(did);
-    }
-
-    private class DatasetVirtualBufferCaches {
-        private final int datasetID;
-        private final Map<Integer, List<IVirtualBufferCache>> ioDeviceVirtualBufferCaches = new HashMap<>();
-
-        public DatasetVirtualBufferCaches(int datasetID) {
-            this.datasetID = datasetID;
+        int datasetId = Integer.parseInt(resourcePath);
+        DatasetResource dsr = datasets.get(datasetId);
+        if (dsr == null) {
+            throw new HyracksDataException(
+                    "Failed to allocate memory for dataset with ID " + datasetId + " since it is not open.");
         }
-
-        private List<IVirtualBufferCache> initializeVirtualBufferCaches(int ioDeviceNum) {
-            assert ioDeviceVirtualBufferCaches.size() < numPartitions;
-            int numPages = datasetID < firstAvilableUserDatasetID
-                    ? storageProperties.getMetadataMemoryComponentNumPages()
-                    : storageProperties.getMemoryComponentNumPages();
-            List<IVirtualBufferCache> vbcs = new ArrayList<>();
-            for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
-                MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
-                        new VirtualBufferCache(
-                                new ResourceHeapBufferAllocator(DatasetLifecycleManager.this,
-                                        Integer.toString(datasetID)),
-                                storageProperties.getMemoryComponentPageSize(),
-                                numPages / storageProperties.getMemoryComponentsNum() / numPartitions));
-                vbcs.add(vbc);
-            }
-            ioDeviceVirtualBufferCaches.put(ioDeviceNum, vbcs);
-            return vbcs;
-        }
-
-        public List<IVirtualBufferCache> getVirtualBufferCaches(int ioDeviceNum) {
-            synchronized (ioDeviceVirtualBufferCaches) {
-                List<IVirtualBufferCache> vbcs = ioDeviceVirtualBufferCaches.get(ioDeviceNum);
-                if (vbcs == null) {
-                    vbcs = initializeVirtualBufferCaches(ioDeviceNum);
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
+        synchronized (dsInfo) {
+            // This is not needed for external datasets' indexes since they never use the virtual buffer cache.
+            if (!dsInfo.isMemoryAllocated() && !dsInfo.isExternal()) {
+                long additionalSize = getVirtualBufferCaches(dsInfo.getDatasetID()).getTotalSize();
+                while (used + additionalSize > capacity) {
+                    if (!evictCandidateDataset()) {
+                        throw new HyracksDataException("Cannot allocate dataset " + dsInfo.getDatasetID()
+                                + " memory since memory budget would be exceeded.");
+                    }
                 }
-                return vbcs;
+                used += additionalSize;
+                dsInfo.setMemoryAllocated(true);
             }
         }
-
-        public long getTotalSize() {
-            int numPages = datasetID < firstAvilableUserDatasetID
-                    ? storageProperties.getMetadataMemoryComponentNumPages()
-                    : storageProperties.getMemoryComponentNumPages();
-
-            return storageProperties.getMemoryComponentPageSize() * numPages;
-        }
     }
 
+    public int getFirstAvilableUserDatasetID() {
+        return firstAvilableUserDatasetID;
+    }
+
+    public int getNumPartitions() {
+        return numPartitions;
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
new file mode 100644
index 0000000..01f2e2b
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+/**
+ * A dataset can be in one of two states { EVICTED , LOADED }.
+ * When a dataset is created, it is in the EVICTED state. In the EVICTED state, the allowed operations are:
+ * 1. DELETE: delete the dataset completely.
+ * 2. LOAD: move the dataset to the LOADED state.
+ * When a dataset is in the LOADED state, the allowed operations are:
+ * 1. OPEN: increment the open counter.
+ * 2. ALLOCATE RESOURCES (memory)
+ * 3. DEALLOCATE RESOURCES (memory)
+ * 4. CLOSE: decrement the open counter.
+ * 5. EVICT: deallocate resources and unload the dataset moving it to the EVICTED state
+ */
+public class DatasetResource implements Comparable<DatasetResource> {
+    private final DatasetInfo datasetInfo;
+    private final PrimaryIndexOperationTracker datasetPrimaryOpTracker;
+    private final DatasetVirtualBufferCaches datasetVirtualBufferCaches;
+
+    public DatasetResource(DatasetInfo datasetInfo,
+            PrimaryIndexOperationTracker datasetPrimaryOpTracker,
+            DatasetVirtualBufferCaches datasetVirtualBufferCaches) {
+        this.datasetInfo = datasetInfo;
+        this.datasetPrimaryOpTracker = datasetPrimaryOpTracker;
+        this.datasetVirtualBufferCaches = datasetVirtualBufferCaches;
+    }
+
+    public boolean isRegistered() {
+        return datasetInfo.isRegistered();
+    }
+
+    public IndexInfo getIndexInfo(long resourceID) {
+        return datasetInfo.getIndexes().get(resourceID);
+    }
+
+    public boolean isOpen() {
+        return datasetInfo.isOpen();
+    }
+
+    public boolean isExternal() {
+        return datasetInfo.isExternal();
+    }
+
+    public void open(boolean open) {
+        datasetInfo.setOpen(open);
+    }
+
+    public void touch() {
+        datasetInfo.touch();
+    }
+
+    public void untouch() {
+        datasetInfo.untouch();
+    }
+
+    public DatasetVirtualBufferCaches getVirtualBufferCaches() {
+        return datasetVirtualBufferCaches;
+    }
+
+    public IIndex getIndex(long resourceID) {
+        IndexInfo iInfo = getIndexInfo(resourceID);
+        if (iInfo == null) {
+            return null;
+        }
+        return iInfo.getIndex();
+    }
+
+    public void register(long resourceID, IIndex index) throws HyracksDataException {
+        if (!datasetInfo.isRegistered()) {
+            synchronized (datasetInfo) {
+                if (!datasetInfo.isRegistered()) {
+                    datasetInfo.setExternal(!index.hasMemoryComponents());
+                    datasetInfo.setRegistered(true);
+                    datasetInfo.setDurable(((ILSMIndex) index).isDurable());
+                }
+            }
+        }
+        if (datasetInfo.getIndexes().containsKey(resourceID)) {
+            throw new HyracksDataException("Index with resource ID " + resourceID + " already exists.");
+        }
+        datasetInfo.getIndexes().put(resourceID,
+                new IndexInfo((ILSMIndex) index, datasetInfo.getDatasetID(), resourceID));
+    }
+
+    public DatasetInfo getDatasetInfo() {
+        return datasetInfo;
+    }
+
+    public PrimaryIndexOperationTracker getOpTracker() {
+        return datasetPrimaryOpTracker;
+    }
+
+    @Override
+    public int compareTo(DatasetResource o) {
+        return datasetInfo.compareTo(o.datasetInfo);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof DatasetResource) {
+            return datasetInfo.equals(((DatasetResource) obj).datasetInfo);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return datasetInfo.hashCode();
+    }
+
+    public Map<Long, IndexInfo> getIndexes() {
+        return datasetInfo.getIndexes();
+    }
+
+    public int getDatasetID() {
+        return datasetInfo.getDatasetID();
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
new file mode 100644
index 0000000..df85509
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.AsterixStorageProperties;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
+import org.apache.hyracks.storage.common.IResourceMemoryManager;
+import org.apache.hyracks.storage.common.buffercache.ResourceHeapBufferAllocator;
+
+public class DatasetVirtualBufferCaches {
+    private final int datasetID;
+    private final AsterixStorageProperties storageProperties;
+    private final int firstAvilableUserDatasetID;
+    private final int numPartitions;
+    private final Map<Integer, List<IVirtualBufferCache>> ioDeviceVirtualBufferCaches = new HashMap<>();
+
+    public DatasetVirtualBufferCaches(int datasetID, AsterixStorageProperties storageProperties,
+            int firstAvilableUserDatasetID, int numPartitions) {
+        this.datasetID = datasetID;
+        this.storageProperties = storageProperties;
+        this.firstAvilableUserDatasetID = firstAvilableUserDatasetID;
+        this.numPartitions = numPartitions;
+    }
+
+    public List<IVirtualBufferCache> initializeVirtualBufferCaches(IResourceMemoryManager memoryManager,
+            int ioDeviceNum) {
+        int numPages = datasetID < firstAvilableUserDatasetID
+                ? storageProperties.getMetadataMemoryComponentNumPages()
+                : storageProperties.getMemoryComponentNumPages();
+        List<IVirtualBufferCache> vbcs = new ArrayList<>();
+        for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
+            MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
+                    new VirtualBufferCache(
+                            new ResourceHeapBufferAllocator(memoryManager,
+                                    Integer.toString(datasetID)),
+                            storageProperties.getMemoryComponentPageSize(),
+                            numPages / storageProperties.getMemoryComponentsNum() / numPartitions));
+            vbcs.add(vbc);
+        }
+        ioDeviceVirtualBufferCaches.put(ioDeviceNum, vbcs);
+        return vbcs;
+    }
+
+    public List<IVirtualBufferCache> getVirtualBufferCaches(IResourceMemoryManager memoryManager, int ioDeviceNum) {
+        synchronized (ioDeviceVirtualBufferCaches) {
+            List<IVirtualBufferCache> vbcs = ioDeviceVirtualBufferCaches.get(ioDeviceNum);
+            if (vbcs == null) {
+                vbcs = initializeVirtualBufferCaches(memoryManager, ioDeviceNum);
+            }
+            return vbcs;
+        }
+    }
+
+    public long getTotalSize() {
+        int numPages = datasetID < firstAvilableUserDatasetID
+                ? storageProperties.getMetadataMemoryComponentNumPages()
+                : storageProperties.getMemoryComponentNumPages();
+        return storageProperties.getMemoryComponentPageSize() * ((long) numPages);
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
new file mode 100644
index 0000000..34ccce0
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class IndexInfo extends Info {
+    private final ILSMIndex index;
+    private final long resourceId;
+    private final int datasetId;
+
+    public IndexInfo(ILSMIndex index, int datasetId, long resourceId) {
+        this.index = index;
+        this.datasetId = datasetId;
+        this.resourceId = resourceId;
+    }
+
+    public ILSMIndex getIndex() {
+        return index;
+    }
+
+    public long getResourceId() {
+        return resourceId;
+    }
+
+    public int getDatasetId() {
+        return datasetId;
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/Info.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/Info.java
new file mode 100644
index 0000000..999eb34
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/Info.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+public abstract class Info {
+    private int referenceCount;
+    private boolean isOpen;
+
+    public Info() {
+        referenceCount = 0;
+        isOpen = false;
+    }
+
+    public void touch() {
+        ++referenceCount;
+    }
+
+    public void untouch() {
+        --referenceCount;
+    }
+
+    public int getReferenceCount() {
+        return referenceCount;
+    }
+
+    public void setReferenceCount(int referenceCount) {
+        this.referenceCount = referenceCount;
+    }
+
+    public boolean isOpen() {
+        return isOpen;
+    }
+
+    public void setOpen(boolean isOpen) {
+        this.isOpen = isOpen;
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 71c30d5..6350d73 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -22,7 +22,6 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
deleted file mode 100644
index 5737957..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
-
-public abstract class AbstractApplicationMessage implements IApplicationMessage {
-    private static final long serialVersionUID = 1L;
-    protected long id;
-
-    @Override
-    public void setId(long id) {
-        this.id = id;
-    }
-
-    @Override
-    public long getId() {
-        return id;
-    }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index b93082d..dbd2139 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -22,31 +22,11 @@
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.service.IControllerService;
 
+@FunctionalInterface
 public interface IApplicationMessage extends IMessage {
 
     /**
-     * Sets a unique message id that identifies this message within an NC.
-     * This id is set by {@link INCMessageBroker#sendMessageToCC(IApplicationMessage, IApplicationMessageCallback)}
-     * when the callback is not null to notify the sender when the response to that message is received.
-     *
-     * @param messageId
-     */
-    public void setId(long messageId);
-
-    /**
-     * @return The unique message id if it has been set, otherwise 0.
-     */
-    public long getId();
-
-    /**
      * handle the message upon delivery
      */
-    public void handle(IControllerService cs) throws HyracksDataException;
-
-    /**
-     * get a string representation for the message type
-     *
-     * @return
-     */
-    public String type();
+    public void handle(IControllerService cs) throws HyracksDataException, InterruptedException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java
deleted file mode 100644
index 3bad5fb..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging.api;
-
-public interface IApplicationMessageCallback {
-
-    /**
-     * Notifies the message sender when the response has been received.
-     *
-     * @param message
-     *            The response message
-     */
-    public void deliverMessageResponse(IApplicationMessage message);
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
index 2290b93..ced2b6d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -21,7 +21,6 @@
 import org.apache.hyracks.api.messages.IMessageBroker;
 
 public interface ICCMessageBroker extends IMessageBroker {
-    public static final long NO_CALLBACK_MESSAGE_ID = -1;
 
     /**
      * Sends the passed message to the specified {@code nodeId}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index f01d0c3..707f864 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -29,7 +29,7 @@
      * @param callback
      * @throws Exception
      */
-    public void sendMessageToCC(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception;
+    public void sendMessageToCC(IApplicationMessage message) throws Exception;
 
     /**
      * Sends application message from this NC to another NC.
@@ -38,7 +38,7 @@
      * @param callback
      * @throws Exception
      */
-    public void sendMessageToNC(String nodeId, IApplicationMessage message, IApplicationMessageCallback callback)
+    public void sendMessageToNC(String nodeId, IApplicationMessage message)
             throws Exception;
 
     /**
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index d0dd3fa..b66bcf8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -27,7 +27,6 @@
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 public interface IAsterixAppRuntimeContextProvider {
 
@@ -49,8 +48,6 @@
 
     public ILocalResourceRepository getLocalResourceRepository();
 
-    public IResourceIdFactory getResourceIdFactory();
-
     public IIOManager getIOManager();
 
     public IAsterixAppRuntimeContext getAppContext();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index c929b41..6de796e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -407,7 +407,7 @@
             throws ACIDException, HyracksDataException, IndexException {
         long resourceID = metadataIndex.getResourceID();
         String resourceName = metadataIndex.getFile().toString();
-        ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.getIndex(resourceName);
+        ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.get(resourceName);
         try {
             datasetLifecycleManager.open(resourceName);
 
@@ -674,7 +674,7 @@
             throws ACIDException, HyracksDataException, IndexException {
         long resourceID = metadataIndex.getResourceID();
         String resourceName = metadataIndex.getFile().toString();
-        ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.getIndex(resourceName);
+        ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.get(resourceName);
         try {
             datasetLifecycleManager.open(resourceName);
             // prepare a Callback for logging
@@ -1066,7 +1066,7 @@
         try {
             IMetadataIndex index = MetadataPrimaryIndexes.DATAVERSE_DATASET;
             String resourceName = index.getFile().toString();
-            IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
+            IIndex indexInstance = datasetLifecycleManager.get(resourceName);
             datasetLifecycleManager.open(resourceName);
             IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
@@ -1088,7 +1088,7 @@
             datasetLifecycleManager.close(resourceName);
 
             index = MetadataPrimaryIndexes.DATASET_DATASET;
-            indexInstance = datasetLifecycleManager.getIndex(resourceName);
+            indexInstance = datasetLifecycleManager.get(resourceName);
             datasetLifecycleManager.open(resourceName);
             indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
@@ -1113,7 +1113,7 @@
             datasetLifecycleManager.close(resourceName);
 
             index = MetadataPrimaryIndexes.INDEX_DATASET;
-            indexInstance = datasetLifecycleManager.getIndex(resourceName);
+            indexInstance = datasetLifecycleManager.get(resourceName);
             datasetLifecycleManager.open(resourceName);
             indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
@@ -1149,7 +1149,7 @@
             throw new MetadataException("No file for Index " + index.getDataverseName() + "." + index.getIndexName());
         }
         String resourceName = index.getFile().toString();
-        IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
+        IIndex indexInstance = datasetLifecycleManager.get(resourceName);
         datasetLifecycleManager.open(resourceName);
         IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
                 NoOpOperationCallback.INSTANCE);
@@ -1187,7 +1187,7 @@
         int mostRecentDatasetId = MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID;
         try {
             String resourceName = MetadataPrimaryIndexes.DATASET_DATASET.getFile().toString();
-            IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
+            IIndex indexInstance = datasetLifecycleManager.get(resourceName);
             datasetLifecycleManager.open(resourceName);
             try {
                 IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 2629fea..55b1045 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -32,9 +32,9 @@
 import org.apache.asterix.common.api.ILocalResourceMetadata;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.context.BaseOperationTracker;
 import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
@@ -59,10 +59,10 @@
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.entities.Node;
-import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails.FileStructure;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.metadata.entities.Node;
+import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.om.types.BuiltinType;
@@ -319,6 +319,8 @@
         String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath();
         FileReference file = ioManager.getAbsoluteFileRef(metadataDeviceId, resourceName);
 
+        // this should not be done this way. dataset lifecycle manager shouldn't return virtual buffer caches for
+        // a dataset that was not yet created
         List<IVirtualBufferCache> virtualBufferCaches = runtimeContext.getDatasetLifecycleManager()
                 .getVirtualBufferCaches(index.getDatasetId().getId(), metadataPartition.getIODeviceNum());
         ITypeTraits[] typeTraits = index.getTypeTraits();
@@ -360,7 +362,7 @@
                         + " to intialize as a new instance. (WARNING: all data will be lost.)");
             }
             resourceID = resource.getResourceId();
-            lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(absolutePath);
+            lsmBtree = (LSMBTree) dataLifecycleManager.get(absolutePath);
             if (lsmBtree == null) {
                 lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider,
                         typeTraits, comparatorFactories, bloomFilterKeyFields,
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 5a88a9f..c0d8320 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -46,7 +46,7 @@
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
-import org.apache.asterix.common.context.DatasetLifecycleManager.IndexInfo;
+import org.apache.asterix.common.context.IndexInfo;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
@@ -59,7 +59,6 @@
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.asterix.replication.functions.ReplicaFilesRequest;
 import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
@@ -70,6 +69,7 @@
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
index 2eb1be9..7c386d5 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
@@ -18,9 +18,9 @@
  */
 package org.apache.asterix.runtime.message;
 
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 
-public abstract class AbstractFailbackPlanMessage extends AbstractApplicationMessage {
+public abstract class AbstractFailbackPlanMessage implements IApplicationMessage {
 
     private static final long serialVersionUID = 1L;
     protected final long planId;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
index c96bcb8..5cb1a6a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
@@ -55,7 +55,8 @@
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("Plan ID: " + planId);
+        sb.append(CompleteFailbackRequestMessage.class.getSimpleName());
+        sb.append(" Plan ID: " + planId);
         sb.append(" Node ID: " + nodeId);
         sb.append(" Partitions: " + partitions);
         return sb.toString();
@@ -78,7 +79,7 @@
             CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(planId,
                     requestId, partitions);
             try {
-                broker.sendMessageToCC(reponse, null);
+                broker.sendMessageToCC(reponse);
             } catch (Exception e) {
                 LOGGER.log(Level.SEVERE, "Failure sending message to CC", e);
                 hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
@@ -88,9 +89,4 @@
             throw hde;
         }
     }
-
-    @Override
-    public String type() {
-        return "COMPLETE_FAILBACK_REQUEST";
-    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
index 3c4b58c..73e5fd2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
@@ -41,7 +41,8 @@
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("Plan ID: " + planId);
+        sb.append(CompleteFailbackResponseMessage.class.getSimpleName());
+        sb.append(" Plan ID: " + planId);
         sb.append(" Partitions: " + partitions);
         return sb.toString();
     }
@@ -50,9 +51,4 @@
     public void handle(IControllerService cs) throws HyracksDataException {
         AsterixClusterProperties.INSTANCE.processCompleteFailbackResponse(this);
     }
-
-    @Override
-    public String type() {
-        return "COMPLETE_FAILBACK_RESPONSE";
-    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
index e3b9fbe..c112366 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
@@ -64,7 +64,8 @@
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("Plan ID: " + planId);
+        sb.append(PreparePartitionsFailbackRequestMessage.class.getSimpleName());
+        sb.append(" Plan ID: " + planId);
         sb.append(" Partitions: " + partitions);
         sb.append(" releaseMetadataNode: " + releaseMetadataNode);
         return sb.toString();
@@ -109,15 +110,10 @@
         PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(planId,
                 requestId, partitions);
         try {
-            broker.sendMessageToCC(reponse, null);
+            broker.sendMessageToCC(reponse);
         } catch (Exception e) {
             LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
             throw ExceptionUtils.convertToHyracksDataException(e);
         }
     }
-
-    @Override
-    public String type() {
-        return "PREPARE_PARTITIONS_FAILBACK_REQUEST";
-    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
index 2e52773..299121a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
@@ -44,7 +44,7 @@
     }
 
     @Override
-    public String type() {
-        return "PREPARE_PARTITIONS_FAILBACK_RESPONSE";
+    public String toString() {
+        return PreparePartitionsFailbackResponseMessage.class.getSimpleName() + " " + partitions.toString();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
index 2aa4746..fc55968 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
@@ -19,7 +19,7 @@
 package org.apache.asterix.runtime.message;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.replication.ReplicaEvent;
 import org.apache.asterix.event.schema.cluster.Node;
@@ -28,7 +28,7 @@
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class ReplicaEventMessage extends AbstractApplicationMessage {
+public class ReplicaEventMessage implements IApplicationMessage {
 
     private static final long serialVersionUID = 1L;
     private final String nodeId;
@@ -66,7 +66,7 @@
     }
 
     @Override
-    public String type() {
-        return "REPLICA_EVENT";
+    public String toString() {
+        return ReplicaEventMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
index 1604e29..f9f6233 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
@@ -23,7 +23,7 @@
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
@@ -32,7 +32,7 @@
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class ReportMaxResourceIdMessage extends AbstractApplicationMessage {
+public class ReportMaxResourceIdMessage implements IApplicationMessage {
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = Logger.getLogger(ReportMaxResourceIdMessage.class.getName());
     private final long maxResourceId;
@@ -62,7 +62,7 @@
                 MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
         ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage(ncs.getId(), maxResourceId);
         try {
-            ((INCMessageBroker) ncs.getApplicationContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg, null);
+            ((INCMessageBroker) ncs.getApplicationContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg);
         } catch (Exception e) {
             LOGGER.log(Level.SEVERE, "Unable to report max local resource id", e);
             throw ExceptionUtils.convertToHyracksDataException(e);
@@ -70,7 +70,7 @@
     }
 
     @Override
-    public String type() {
-        return "REPORT_MAX_RESOURCE_ID_RESPONSE";
+    public String toString() {
+        return ReportMaxResourceIdMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
index 3e2becf..203104e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
@@ -18,12 +18,12 @@
  */
 package org.apache.asterix.runtime.message;
 
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class ReportMaxResourceIdRequestMessage extends AbstractApplicationMessage {
+public class ReportMaxResourceIdRequestMessage implements IApplicationMessage {
     private static final long serialVersionUID = 1L;
 
     @Override
@@ -32,7 +32,7 @@
     }
 
     @Override
-    public String type() {
-        return "REPORT_MAX_RESOURCE_ID_REQUEST";
+    public String toString() {
+        return ReportMaxResourceIdRequestMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 5e7d808..32d3f64 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -21,7 +21,7 @@
 import java.util.Set;
 
 import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
 import org.apache.asterix.runtime.util.AsterixAppContextInfo;
@@ -29,7 +29,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
-public class ResourceIdRequestMessage extends AbstractApplicationMessage {
+public class ResourceIdRequestMessage implements IApplicationMessage {
     private static final long serialVersionUID = 1L;
     private final String src;
 
@@ -43,7 +43,6 @@
             ICCMessageBroker broker =
                     (ICCMessageBroker) AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();
             ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage();
-            reponse.setId(id);
             if (!AsterixClusterProperties.INSTANCE.isClusterActive()) {
                 reponse.setResourceId(-1);
                 reponse.setException(new Exception("Cannot generate global resource id when cluster is not active."));
@@ -66,7 +65,6 @@
             throws Exception {
         Set<String> getParticipantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
         ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage();
-        msg.setId(ICCMessageBroker.NO_CALLBACK_MESSAGE_ID);
         for (String nodeId : getParticipantNodes) {
             if (!resourceIdManager.reported(nodeId)) {
                 broker.sendApplicationMessageToNC(msg, nodeId);
@@ -75,7 +73,7 @@
     }
 
     @Override
-    public String type() {
-        return "RESOURCE_ID_REQUEST";
+    public String toString() {
+        return ReportMaxResourceIdRequestMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
index 62c2163..d4cc022 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
@@ -18,11 +18,14 @@
  */
 package org.apache.asterix.runtime.message;
 
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.runtime.transaction.GlobalResourceIdFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class ResourceIdRequestResponseMessage extends AbstractApplicationMessage {
+public class ResourceIdRequestResponseMessage implements IApplicationMessage {
     private static final long serialVersionUID = 1L;
 
     private long resourceId;
@@ -45,13 +48,15 @@
     }
 
     @Override
-    public void handle(IControllerService cs) throws HyracksDataException {
-        // Do nothing. for this message, the callback handles it, we probably should get rid of callbacks and
-        // instead, use the handle in the response to perform callback action
+    public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAsterixAppRuntimeContext asterixNcAppRuntimeCtx =
+                (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        ((GlobalResourceIdFactory) asterixNcAppRuntimeCtx.getResourceIdFactory()).addNewIds(this);
     }
 
     @Override
-    public String type() {
-        return "RESOURCE_ID_RESPONSE";
+    public String toString() {
+        return ResourceIdRequestResponseMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
index b18a879..e877f52 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
@@ -23,13 +23,13 @@
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessage {
+public class TakeoverMetadataNodeRequestMessage implements IApplicationMessage {
 
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = Logger.getLogger(TakeoverMetadataNodeRequestMessage.class.getName());
@@ -51,7 +51,7 @@
             TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
                     appContext.getTransactionSubsystem().getId());
             try {
-                broker.sendMessageToCC(reponse, null);
+                broker.sendMessageToCC(reponse);
             } catch (Exception e) {
                 LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
                 hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
@@ -63,7 +63,7 @@
     }
 
     @Override
-    public String type() {
-        return "TAKEOVER_METADATA_NODE_REQUEST";
+    public String toString() {
+        return TakeoverMetadataNodeRequestMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
index f7016bc..e18fc8d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
@@ -18,12 +18,12 @@
  */
 package org.apache.asterix.runtime.message;
 
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
-public class TakeoverMetadataNodeResponseMessage extends AbstractApplicationMessage {
+public class TakeoverMetadataNodeResponseMessage implements IApplicationMessage {
 
     private static final long serialVersionUID = 1L;
     private final String nodeId;
@@ -42,7 +42,7 @@
     }
 
     @Override
-    public String type() {
-        return "TAKEOVER_METADATA_NODE_RESPONSE";
+    public String toString() {
+        return TakeoverMetadataNodeResponseMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
index 3b91084..e024eed 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
@@ -25,14 +25,14 @@
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage {
+public class TakeoverPartitionsRequestMessage implements IApplicationMessage {
 
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = Logger.getLogger(TakeoverPartitionsRequestMessage.class.getName());
@@ -61,7 +61,8 @@
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("Request ID: " + requestId);
+        sb.append(TakeoverPartitionsRequestMessage.class.getSimpleName());
+        sb.append(" Request ID: " + requestId);
         sb.append(" Node ID: " + nodeId);
         sb.append(" Partitions: ");
         for (Integer partitionId : partitions) {
@@ -92,7 +93,7 @@
                 TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(requestId,
                         appContext.getTransactionSubsystem().getId(), partitions);
                 try {
-                    broker.sendMessageToCC(reponse, null);
+                    broker.sendMessageToCC(reponse);
                 } catch (Exception e) {
                     LOGGER.log(Level.SEVERE, "Failure taking over partitions", e);
                     hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
@@ -103,9 +104,4 @@
             }
         }
     }
-
-    @Override
-    public String type() {
-        return "TAKEOVER_PARTITIONS_REQUEST";
-    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
index 9ec71b7..1c57a2a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
@@ -18,12 +18,12 @@
  */
 package org.apache.asterix.runtime.message;
 
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
-public class TakeoverPartitionsResponseMessage extends AbstractApplicationMessage {
+public class TakeoverPartitionsResponseMessage implements IApplicationMessage {
 
     private static final long serialVersionUID = 1L;
     private final Integer[] partitions;
@@ -54,7 +54,7 @@
     }
 
     @Override
-    public String type() {
-        return "TAKEOVER_PARTITIONS_RESPONSE";
+    public String toString() {
+        return TakeoverPartitionsResponseMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index 4cb65c2..a8b61f7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -20,8 +20,6 @@
 
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.runtime.message.ResourceIdRequestMessage;
 import org.apache.asterix.runtime.message.ResourceIdRequestResponseMessage;
@@ -34,10 +32,10 @@
  * A resource id factory that generates unique resource ids across all NCs by requesting
  * unique ids from the cluster controller.
  */
-public class GlobalResourceIdFactory implements IResourceIdFactory, IApplicationMessageCallback {
+public class GlobalResourceIdFactory implements IResourceIdFactory {
 
     private final IApplicationContext appCtx;
-    private final LinkedBlockingQueue<IApplicationMessage> resourceIdResponseQ;
+    private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> resourceIdResponseQ;
     private final String nodeId;
 
     public GlobalResourceIdFactory(IApplicationContext appCtx) {
@@ -46,6 +44,10 @@
         this.nodeId = ((NodeControllerService) appCtx.getControllerService()).getApplicationContext().getNodeId();
     }
 
+    public void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse) throws InterruptedException {
+        resourceIdResponseQ.put(resourceIdResponse);
+    }
+
     @Override
     public long createId() throws HyracksDataException {
         try {
@@ -54,15 +56,15 @@
             if (!resourceIdResponseQ.isEmpty()) {
                 synchronized (resourceIdResponseQ) {
                     if (!resourceIdResponseQ.isEmpty()) {
-                        reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
+                        reponse = resourceIdResponseQ.take();
                     }
                 }
             }
             //if no response available or it has an exception, request a new one
             if (reponse == null || reponse.getException() != null) {
                 ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId);
-                ((INCMessageBroker) appCtx.getMessageBroker()).sendMessageToCC(msg, this);
-                reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
+                ((INCMessageBroker) appCtx.getMessageBroker()).sendMessageToCC(msg);
+                reponse = resourceIdResponseQ.take();
                 if (reponse.getException() != null) {
                     throw new HyracksDataException(reponse.getException().getMessage());
                 }
@@ -72,9 +74,4 @@
             throw new HyracksDataException(e);
         }
     }
-
-    @Override
-    public void deliverMessageResponse(IApplicationMessage message) {
-        resourceIdResponseQ.offer(message);
-    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
index d201d60..d9a55ad 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
@@ -411,7 +411,7 @@
                 for (Integer partitionId : request.getPartitions()) {
                     nodePartitions.add(clusterPartitions.get(partitionId));
                 }
-                failedTakeoverRequests.add(request.getId());
+                failedTakeoverRequests.add(request.getRequestId());
             }
         }
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 52e2818..e5f3555 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -29,7 +29,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -59,9 +59,9 @@
             throws HyracksDataException {
 
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IIndexLifecycleManager indexLifeCycleManager =
+        IResourceLifecycleManager indexLifeCycleManager =
                 txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index c6743dd..f9f2b89 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -29,7 +29,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -55,9 +55,9 @@
             int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
             throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+        IResourceLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 8c91c1a..df7d65d 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -29,7 +29,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -53,9 +53,9 @@
             int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
             throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+        IResourceLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 3e11531..39e916b4 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -29,7 +29,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -53,9 +53,9 @@
             int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
             throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+        IResourceLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index 5bf1505..85cdc89 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -28,7 +28,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -55,9 +55,9 @@
             throws HyracksDataException {
 
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IIndexLifecycleManager indexLifeCycleManager =
+        IResourceLifecycleManager indexLifeCycleManager =
                 txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourceName);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
index c3cf3e0..851289e 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -18,21 +18,25 @@
  */
 package org.apache.asterix.transaction.management.service.recovery;
 
+import java.io.IOError;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 
 public class CheckpointThread extends Thread {
 
+    private static final Logger LOGGER = Logger.getLogger(CheckpointThread.class.getName());
     private long lsnThreshold;
     private long checkpointTermInSecs;
 
     private final ILogManager logManager;
     private final IRecoveryManager recoveryMgr;
 
-    public CheckpointThread(IRecoveryManager recoveryMgr, IIndexLifecycleManager indexLifecycleManager, ILogManager logManager,
+    public CheckpointThread(IRecoveryManager recoveryMgr, ILogManager logManager,
             long lsnThreshold, long checkpointTermInSecs) {
         this.recoveryMgr = recoveryMgr;
         this.logManager = logManager;
@@ -45,24 +49,25 @@
 
         Thread.currentThread().setName("Checkpoint Thread");
 
-        long currentCheckpointAttemptMinLSN = -1;
+        long currentCheckpointAttemptMinLSN;
         long lastCheckpointLSN = -1;
-        long currentLogLSN = 0;
-        long targetCheckpointLSN = 0;
+        long currentLogLSN;
+        long targetCheckpointLSN;
         while (true) {
             try {
                 sleep(checkpointTermInSecs * 1000);
             } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                 //ignore
             }
 
-
-            if(lastCheckpointLSN == -1)
-            {
+            if (lastCheckpointLSN == -1) {
                 try {
-                    //Since the system just started up after sharp checkpoint, last checkpoint LSN is considered as the min LSN of the current log partition
+                    //Since the system just started up after sharp checkpoint,
+                    //last checkpoint LSN is considered as the min LSN of the current log partition
                     lastCheckpointLSN = logManager.getReadableSmallestLSN();
                 } catch (Exception e) {
+                    LOGGER.log(Level.WARNING, "Error getting smallest readable LSN", e);
                     lastCheckpointLSN = 0;
                 }
             }
@@ -82,13 +87,12 @@
                     currentCheckpointAttemptMinLSN = recoveryMgr.checkpoint(false, targetCheckpointLSN);
 
                     //checkpoint was completed at target LSN or above
-                    if(currentCheckpointAttemptMinLSN >= targetCheckpointLSN)
-                    {
+                    if (currentCheckpointAttemptMinLSN >= targetCheckpointLSN) {
                         lastCheckpointLSN = currentCheckpointAttemptMinLSN;
                     }
 
                 } catch (ACIDException | HyracksDataException e) {
-                    throw new Error("failed to checkpoint", e);
+                    throw new IOError(e);
                 }
             }
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 0183b29..eb22c22 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -367,7 +367,7 @@
                                 String resourceAbsolutePath =
                                         partitionIODevicePath + File.separator + localResource.getResourceName();
                                 localResource.setResourcePath(resourceAbsolutePath);
-                                index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
+                                index = (ILSMIndex) datasetLifecycleManager.get(resourceAbsolutePath);
                                 if (index == null) {
                                     //#. create index instance and register to indexLifeCycleManager
                                     localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
@@ -568,7 +568,7 @@
     public long getLocalMinFirstLSN() throws HyracksDataException {
         IDatasetLifecycleManager datasetLifecycleManager =
                 txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
-        List<IIndex> openIndexList = datasetLifecycleManager.getOpenIndexes();
+        List<IIndex> openIndexList = datasetLifecycleManager.getOpenResources();
         long firstLSN;
         //the min first lsn can only be the current append or smaller
         long minFirstLSN = logMgr.getAppendLSN();
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index f7ed355..6b25542 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -81,8 +81,7 @@
         this.recoveryManager = new RecoveryManager(this);
 
         if (asterixAppRuntimeContextProvider != null) {
-            this.checkpointThread = new CheckpointThread(recoveryManager,
-                    asterixAppRuntimeContextProvider.getDatasetLifecycleManager(), logManager,
+            this.checkpointThread = new CheckpointThread(recoveryManager, logManager,
                     this.txnProperties.getCheckpointLSNThreshold(), this.txnProperties.getCheckpointPollFrequency());
             this.checkpointThread.start();
         } else {
@@ -95,22 +94,27 @@
         }
     }
 
+    @Override
     public ILogManager getLogManager() {
         return logManager;
     }
 
+    @Override
     public ILockManager getLockManager() {
         return lockManager;
     }
 
+    @Override
     public ITransactionManager getTransactionManager() {
         return transactionManager;
     }
 
+    @Override
     public IRecoveryManager getRecoveryManager() {
         return recoveryManager;
     }
 
+    @Override
     public IAsterixAppRuntimeContextProvider getAsterixAppRuntimeContextProvider() {
         return asterixAppRuntimeContextProvider;
     }
@@ -119,6 +123,7 @@
         return txnProperties;
     }
 
+    @Override
     public String getId() {
         return id;
     }
diff --git a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
index 3a4a56a..887532c 100644
--- a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
+++ b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.transaction.management.service.locking;
 
+import static org.mockito.Mockito.mock;
+
 import java.util.concurrent.Executors;
 
 import org.apache.asterix.common.api.AsterixThreadExecutor;
@@ -31,9 +33,6 @@
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
-
-import static org.mockito.Mockito.mock;
 
 class TestRuntimeContextProvider implements IAsterixAppRuntimeContextProvider {
 
@@ -86,11 +85,6 @@
     }
 
     @Override
-    public ResourceIdFactory getResourceIdFactory() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
     public IIOManager getIOManager() {
         throw new UnsupportedOperationException();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessage.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessage.java
index ad19cf0..2f5a873 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessage.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessage.java
@@ -20,9 +20,6 @@
 
 import java.io.Serializable;
 
-/**
- * @author rico
- */
 public interface IMessage extends Serializable {
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
index 2b166d2..8aaa563 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
@@ -18,9 +18,6 @@
  */
 package org.apache.hyracks.api.messages;
 
-/**
- * @author rico
- */
 public interface IMessageBroker {
 
     public void receivedMessage(IMessage message, String nodeId) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/IndexLifecycleManagerProvider.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/IndexLifecycleManagerProvider.java
index 5228f80..960a11c 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/IndexLifecycleManagerProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/IndexLifecycleManagerProvider.java
@@ -19,14 +19,14 @@
 package org.apache.hyracks.examples.btree.helper;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 
 public enum IndexLifecycleManagerProvider implements IIndexLifecycleManagerProvider {
     INSTANCE;
 
     @Override
-    public IIndexLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
+    public IResourceLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
         return RuntimeContext.get(ctx).getIndexLifecycleManager();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
index 2e96527..a4909ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
@@ -24,7 +24,7 @@
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
@@ -46,7 +46,7 @@
     private IBufferCache bufferCache;
     private IFileMapManager fileMapManager;
     private ILocalResourceRepository localResourceRepository;
-    private IIndexLifecycleManager lcManager;
+    private IResourceLifecycleManager lcManager;
     private ResourceIdFactory resourceIdFactory;
     private ThreadFactory threadFactory = new ThreadFactory() {
         public Thread newThread(Runnable r) {
@@ -90,7 +90,7 @@
         return resourceIdFactory;
     }
 
-    public IIndexLifecycleManager getIndexLifecycleManager() {
+    public IResourceLifecycleManager getIndexLifecycleManager() {
         return lcManager;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndex.java
index 8ea1f88..be68903 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndex.java
@@ -26,7 +26,7 @@
  * This interface describes the operations common to all indexes. Indexes
  * implementing this interface can easily reuse existing index operators for
  * dataflow. Users must perform operations on an via an {@link IIndexAccessor}.
- * During dataflow, the lifecycle of IIndexes are handled through an {@link IIndexLifecycleManager}.
+ * During dataflow, the lifecycle of IIndexes are handled through an {@link IResourceLifecycleManager}.
  */
 public interface IIndex {
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManager.java
deleted file mode 100644
index 4c00b93..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManager.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.storage.am.common.api;
-
-import java.util.List;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.common.IResourceMemoryManager;
-
-public interface IIndexLifecycleManager extends IResourceMemoryManager {
-    public List<IIndex> getOpenIndexes();
-
-    public void register(String resourcePath, IIndex index) throws HyracksDataException;
-
-    public void open(String resourcePath) throws HyracksDataException;
-
-    public void close(String resourcePath) throws HyracksDataException;
-
-    public IIndex getIndex(String resourcePath) throws HyracksDataException;
-
-    public void unregister(String resourcePath) throws HyracksDataException;
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManagerProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManagerProvider.java
index 127ee83..5378610 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManagerProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManagerProvider.java
@@ -23,5 +23,5 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public interface IIndexLifecycleManagerProvider extends Serializable {
-    public IIndexLifecycleManager getLifecycleManager(IHyracksTaskContext ctx);
+    public IResourceLifecycleManager<IIndex> getLifecycleManager(IHyracksTaskContext ctx);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IResourceLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IResourceLifecycleManager.java
new file mode 100644
index 0000000..1d18a8f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IResourceLifecycleManager.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.common.api;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.IResourceMemoryManager;
+
+/**
+ * The base interface for resource management
+ *
+ * @param <R>
+ *            resource class
+ */
+public interface IResourceLifecycleManager<R> extends IResourceMemoryManager {
+    /**
+     * get a list of all resources which are opened
+     *
+     * @return
+     */
+    public List<R> getOpenResources();
+
+    /**
+     * Registers a resource.
+     *
+     * @param resourceId
+     * @param resource
+     * @throws HyracksDataException
+     *             if a resource is already registered with this resourceId
+     */
+    public void register(String resourceId, R resource) throws HyracksDataException;
+
+    /**
+     * Opens a resource. The resource is moved to the open state
+     *
+     * @param resourceId
+     * @throws HyracksDataException
+     *             if no resource with the resourceId exists
+     */
+    public void open(String resourceId) throws HyracksDataException;
+
+    /**
+     * closes a resource and free up its allocated resources
+     *
+     * @param resourceId
+     * @throws HyracksDataException
+     */
+    public void close(String resourceId) throws HyracksDataException;
+
+    /**
+     * gets the resource registered with this id
+     *
+     * @param resourceId
+     * @return the resource if registered or null
+     * @throws HyracksDataException
+     */
+    public R get(String resourceId) throws HyracksDataException;
+
+    /**
+     * unregister a resource removing its resources in memory and on disk
+     *
+     * @param resourceId
+     * @throws HyracksDataException
+     */
+    public void unregister(String resourceId) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index 2e45c7c..202c0a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -26,7 +26,7 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.frames.LIFOMetaDataFrame;
 import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
 import org.apache.hyracks.storage.common.file.ILocalResourceFactory;
@@ -38,7 +38,7 @@
 
     protected final IIndexOperatorDescriptor opDesc;
     protected final IHyracksTaskContext ctx;
-    protected final IIndexLifecycleManager lcManager;
+    protected final IResourceLifecycleManager<IIndex> lcManager;
     protected final ILocalResourceRepository localResourceRepository;
     protected final IResourceIdFactory resourceIdFactory;
     protected final FileReference file;
@@ -72,8 +72,9 @@
     @Override
     public void create() throws HyracksDataException {
         synchronized (lcManager) {
-            index = lcManager.getIndex(resourcePath);
+            index = lcManager.get(resourcePath);
             if (index != null) {
+                //how is this right?????????? <needs to be fixed>
                 lcManager.unregister(resourcePath);
             } else {
                 index = createIndexInstance();
@@ -109,7 +110,7 @@
                 throw new HyracksDataException("Index does not have a valid resource ID. Has it been created yet?");
             }
 
-            index = lcManager.getIndex(resourcePath);
+            index = lcManager.get(resourcePath);
             if (index == null) {
                 index = createIndexInstance();
                 lcManager.register(resourcePath, index);
@@ -128,7 +129,7 @@
     @Override
     public void destroy() throws HyracksDataException {
         synchronized (lcManager) {
-            index = lcManager.getIndex(resourcePath);
+            index = lcManager.get(resourcePath);
             if (index != null) {
                 lcManager.unregister(resourcePath);
             } else {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index 5953abe..224283d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -30,9 +30,9 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.am.common.api.IIndex;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 
-public class IndexLifecycleManager implements IIndexLifecycleManager, ILifeCycleComponent {
+public class IndexLifecycleManager implements IResourceLifecycleManager<IIndex>, ILifeCycleComponent {
     private static final long DEFAULT_MEMORY_BUDGET = 1024 * 1024 * 100; // 100 megabytes
 
     private final Map<String, IndexInfo> indexInfos;
@@ -131,6 +131,7 @@
             }
         }
 
+        @Override
         public String toString() {
             return "{index: " + index + ", isOpen: " + isOpen + ", refCount: " + referenceCount + ", lastAccess: "
                     + lastAccess + "}";
@@ -138,7 +139,7 @@
     }
 
     @Override
-    public List<IIndex> getOpenIndexes() {
+    public List<IIndex> getOpenResources() {
         List<IIndex> openIndexes = new ArrayList<IIndex>();
         for (IndexInfo i : indexInfos.values()) {
             if (i.isOpen) {
@@ -165,6 +166,7 @@
         }
     }
 
+    @Override
     public void dumpState(OutputStream os) throws IOException {
         StringBuilder sb = new StringBuilder();
 
@@ -212,7 +214,7 @@
     }
 
     @Override
-    public IIndex getIndex(String resourcePath) throws HyracksDataException {
+    public IIndex get(String resourcePath) throws HyracksDataException {
         IndexInfo info = indexInfos.get(resourcePath);
         return info == null ? null : info.index;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeDataflowHelper.java
index ab5be15..ef8f646 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeDataflowHelper.java
@@ -59,7 +59,7 @@
         synchronized (lcManager) {
             if (index == null) {
                 try {
-                    index = lcManager.getIndex(resourcePath);
+                    index = lcManager.get(resourcePath);
                 } catch (HyracksDataException e) {
                     return null;
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyDataflowHelper.java
index 6e0beb3..e7a1161 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyDataflowHelper.java
@@ -60,7 +60,7 @@
         synchronized (lcManager) {
             if (index == null) {
                 try {
-                    index = lcManager.getIndex(resourcePath);
+                    index = lcManager.get(resourcePath);
                 } catch (HyracksDataException e) {
                     return null;
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
index d2d86e6..c047777 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
@@ -23,7 +23,7 @@
 import java.util.Set;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 
 public interface ILSMMergePolicyFactory extends Serializable {
     // Having two methods to create the merge policy with two different signatures is hacky, but we do it now
@@ -32,7 +32,7 @@
     // in asterix and cannot be seen in hyracks. Thus we pass IHyracksTaskContext and let the merge policy do the casting.
     public ILSMMergePolicy createMergePolicy(Map<String, String> configuration, IHyracksTaskContext ctx);
 
-    public ILSMMergePolicy createMergePolicy(Map<String, String> configuration, IIndexLifecycleManager ilcm);
+    public ILSMMergePolicy createMergePolicy(Map<String, String> configuration, IResourceLifecycleManager ilcm);
 
     public String getName();
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java
index 08672a5..e75aa7f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java
@@ -24,7 +24,7 @@
 import java.util.Set;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
@@ -53,7 +53,7 @@
     }
 
     @Override
-    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IIndexLifecycleManager ilcm) {
+    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IResourceLifecycleManager ilcm) {
         ILSMMergePolicy policy = new ConstantMergePolicy();
         policy.configure(properties);
         return policy;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
index b3b35f4..052e9af 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
@@ -25,7 +25,7 @@
 import java.util.Set;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
@@ -54,7 +54,7 @@
     }
 
     @Override
-    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IIndexLifecycleManager ilcm) {
+    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IResourceLifecycleManager ilcm) {
         ILSMMergePolicy policy = new NoMergePolicy();
         policy.configure(properties);
         return policy;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java
index 465a5f4..e1e05f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java
@@ -25,7 +25,7 @@
 import java.util.Set;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
@@ -55,7 +55,7 @@
     }
 
     @Override
-    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IIndexLifecycleManager ilcm) {
+    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IResourceLifecycleManager ilcm) {
         ILSMMergePolicy policy = new PrefixMergePolicy();
         policy.configure(properties);
         return policy;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeDataflowHelper.java
index 3503df5..9731371 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeDataflowHelper.java
@@ -77,7 +77,7 @@
         synchronized (lcManager) {
             if (index == null) {
                 try {
-                    index = lcManager.getIndex(resourcePath);
+                    index = lcManager.get(resourcePath);
                 } catch (HyracksDataException e) {
                     return null;
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestIndexLifecycleManagerProvider.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestIndexLifecycleManagerProvider.java
index f48778e..409b49b 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestIndexLifecycleManagerProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestIndexLifecycleManagerProvider.java
@@ -19,16 +19,17 @@
 package org.apache.hyracks.test.support;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 
 public class TestIndexLifecycleManagerProvider implements IIndexLifecycleManagerProvider {
 
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IIndexLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
-        return TestStorageManagerComponentHolder.getIndexLifecycleManager(ctx);
+    public IResourceLifecycleManager<IIndex> getLifecycleManager(IHyracksTaskContext ctx) {
+        return TestStorageManagerComponentHolder.getIndexLifecycleManager();
     }
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
index 482dccd..ee5f320 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -28,7 +28,8 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.control.nc.io.IOManager;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IIndex;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
@@ -51,13 +52,14 @@
     private static IFileMapProvider fileMapProvider;
     private static IOManager ioManager;
     private static ILocalResourceRepository localResourceRepository;
-    private static IIndexLifecycleManager lcManager;
+    private static IResourceLifecycleManager<IIndex> lcManager;
     private static ResourceIdFactory resourceIdFactory;
 
     private static int pageSize;
     private static int numPages;
     private static int maxOpenFiles;
     private final static ThreadFactory threadFactory = new ThreadFactory() {
+        @Override
         public Thread newThread(Runnable r) {
             return new Thread(r);
         }
@@ -73,7 +75,7 @@
         lcManager = null;
     }
 
-    public synchronized static IIndexLifecycleManager getIndexLifecycleManager(IHyracksTaskContext ctx) {
+    public synchronized static IResourceLifecycleManager<IIndex> getIndexLifecycleManager() {
         if (lcManager == null) {
             lcManager = new IndexLifecycleManager();
         }
@@ -110,7 +112,8 @@
     public synchronized static ILocalResourceRepository getLocalResourceRepository(IHyracksTaskContext ctx) {
         if (localResourceRepository == null) {
             try {
-                ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory();
+                ILocalResourceRepositoryFactory localResourceRepositoryFactory =
+                        new TransientLocalResourceRepositoryFactory();
                 localResourceRepository = localResourceRepositoryFactory.createRepository();
             } catch (HyracksException e) {
                 //In order not to change the IStorageManagerInterface due to the test code, throw runtime exception.
