Small Cleanup Towards Fixing LifeCycle Issues

Before this change, dataset lifecycle manager was providing a set
of functionalities that are loosly related to management of datasets
and indexes. However, it was not clear what the possible states of
a dataset or an index and what is the responsibility of each object.

This change takes the first step towards fixing this area. Indexes
of a dataset are now grouped together under a single lifecycle class

A resource aka dataset must be created outside the lifecycle manager
and registered with it before it can be assigned resources (memory)
and before it can be used by any operation. This is still not the
case.

Change-Id: I84005a33837725f41ae63297a3711215dccce1d8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1148
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index 50fa257..392bec8 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -22,12 +22,12 @@
 
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class ActiveManagerMessage extends AbstractApplicationMessage {
+public class ActiveManagerMessage implements IApplicationMessage {
     public static final byte STOP_ACTIVITY = 0x00;
 
     private static final long serialVersionUID = 1L;
@@ -62,7 +62,7 @@
     }
 
     @Override
-    public String type() {
-        return "ACTIVE_MANAGER_MESSAGE";
+    public String toString() {
+        return ActiveManagerMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index 02affc4..fc67d3c 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -22,12 +22,12 @@
 
 import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.service.IControllerService;
 
-public class ActivePartitionMessage extends AbstractApplicationMessage {
+public class ActivePartitionMessage implements IApplicationMessage {
 
     public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00;
     public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01;
@@ -70,7 +70,7 @@
     }
 
     @Override
-    public String type() {
-        return "ACTIVE_ENTITY_TO_CC_MESSAGE";
+    public String toString() {
+        return ActivePartitionMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
index 265025f..ec0aaa8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
@@ -30,7 +30,6 @@
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 public class AsterixAppRuntimeContextProviderForRecovery implements IAsterixAppRuntimeContextProvider {
 
@@ -76,11 +75,6 @@
     }
 
     @Override
-    public IResourceIdFactory getResourceIdFactory() {
-        return asterixAppRuntimeContext.getResourceIdFactory();
-    }
-
-    @Override
     public IIOManager getIOManager() {
         return asterixAppRuntimeContext.getIOManager();
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
index 9127ee5..ce20c9c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
@@ -60,7 +60,6 @@
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataNode;
@@ -73,6 +72,7 @@
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider;
 import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.hyracks.api.application.IApplicationConfig;
@@ -105,6 +105,7 @@
 
     private ILSMMergePolicyFactory metadataMergePolicyFactory;
     private final INCApplicationContext ncApplicationContext;
+    private final IResourceIdFactory resourceIdFactory;
 
     private AsterixCompilerProperties compilerProperties;
     private AsterixExternalProperties externalProperties;
@@ -124,7 +125,6 @@
 
     private ILSMIOOperationScheduler lsmIOScheduler;
     private PersistentLocalResourceRepository localResourceRepository;
-    private IResourceIdFactory resourceIdFactory;
     private IIOManager ioManager;
     private boolean isShuttingdown;
 
@@ -171,6 +171,7 @@
         }
         allExtensions.addAll(new AsterixExtensionProperties(propertiesAccessor).getExtensions());
         ncExtensionManager = new NCExtensionManager(allExtensions);
+        resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
     }
 
     @Override
@@ -193,7 +194,7 @@
 
         ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory =
                 new PersistentLocalResourceRepositoryFactory(
-                ioManager, ncApplicationContext.getNodeId(), metadataProperties);
+                        ioManager, ncApplicationContext.getNodeId(), metadataProperties);
 
         localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory
                 .createRepository();
@@ -209,7 +210,6 @@
             //delete any storage data before the resource factory is initialized
             localResourceRepository.deleteStorageData(true);
         }
-        initializeResourceIdFactory();
 
         datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
                 MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager(),
@@ -290,6 +290,7 @@
         lccm.register((ILifeCycleComponent) datasetLifecycleManager);
         lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
         lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
+
     }
 
     @Override
@@ -442,11 +443,6 @@
     }
 
     @Override
-    public void initializeResourceIdFactory() throws HyracksDataException {
-        resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
-    }
-
-    @Override
     public void initializeMetadata(boolean newUniverse) throws Exception {
         IAsterixStateProxy proxy;
         if (LOGGER.isLoggable(Level.INFO)) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 1345aa7..d1d7ff7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -22,7 +22,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.hyracks.api.messages.IMessage;
@@ -41,9 +40,9 @@
 
     @Override
     public void receivedMessage(IMessage message, String nodeId) throws Exception {
-        AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
+        IApplicationMessage absMessage = (IApplicationMessage) message;
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Received message: " + absMessage.type());
+            LOGGER.info("Received message: " + absMessage);
         }
         absMessage.handle(ccs);
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 9851b61..1beff82 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -20,19 +20,14 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.config.MessagingProperties;
 import org.apache.asterix.common.memory.ConcurrentFramePool;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -44,8 +39,6 @@
     private static final Logger LOGGER = Logger.getLogger(NCMessageBroker.class.getName());
 
     private final NodeControllerService ncs;
-    private final AtomicLong messageId = new AtomicLong(0);
-    private final Map<Long, IApplicationMessageCallback> callbacks;
     private final IAsterixAppRuntimeContext appContext;
     private final LinkedBlockingQueue<IApplicationMessage> receivedMsgsQ;
     private final ConcurrentFramePool messagingFramePool;
@@ -54,7 +47,6 @@
     public NCMessageBroker(NodeControllerService ncs, MessagingProperties messagingProperties) {
         this.ncs = ncs;
         appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
-        callbacks = new ConcurrentHashMap<>();
         maxMsgSize = messagingProperties.getFrameSize();
         int messagingMemoryBudget = messagingProperties.getFrameSize() * messagingProperties.getFrameCount();
         messagingFramePool = new ConcurrentFramePool(ncs.getId(), messagingMemoryBudget,
@@ -65,27 +57,15 @@
     }
 
     @Override
-    public void sendMessageToCC(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception {
-        registerMsgCallback(message, callback);
-        try {
-            ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
-        } catch (Exception e) {
-            handleMsgDeliveryFailure(message);
-            throw e;
-        }
+    public void sendMessageToCC(IApplicationMessage message) throws Exception {
+        ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
     }
 
     @Override
-    public void sendMessageToNC(String nodeId, IApplicationMessage message, IApplicationMessageCallback callback)
+    public void sendMessageToNC(String nodeId, IApplicationMessage message)
             throws Exception {
-        registerMsgCallback(message, callback);
-        try {
-            IChannelControlBlock messagingChannel = ncs.getMessagingNetworkManager().getMessagingChannel(nodeId);
-            sendMessageToChannel(messagingChannel, message);
-        } catch (Exception e) {
-            handleMsgDeliveryFailure(message);
-            throw e;
-        }
+        IChannelControlBlock messagingChannel = ncs.getMessagingNetworkManager().getMessagingChannel(nodeId);
+        sendMessageToChannel(messagingChannel, message);
     }
 
     @Override
@@ -95,14 +75,9 @@
 
     @Override
     public void receivedMessage(IMessage message, String nodeId) throws Exception {
-        AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
+        IApplicationMessage absMessage = (IApplicationMessage) message;
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Received message: " + absMessage.type());
-        }
-        //if the received message is a response to a sent message, deliver it to the sender
-        IApplicationMessageCallback callback = callbacks.remove(absMessage.getId());
-        if (callback != null) {
-            callback.deliverMessageResponse(absMessage);
+            LOGGER.info("Received message: " + absMessage);
         }
         absMessage.handle(ncs);
     }
@@ -111,18 +86,6 @@
         return messagingFramePool;
     }
 
-    private void registerMsgCallback(IApplicationMessage message, IApplicationMessageCallback callback) {
-        if (callback != null) {
-            long uniqueMessageId = messageId.incrementAndGet();
-            message.setId(uniqueMessageId);
-            callbacks.put(uniqueMessageId, callback);
-        }
-    }
-
-    private void handleMsgDeliveryFailure(IApplicationMessage message) {
-        callbacks.remove(message.getId());
-    }
-
     private void sendMessageToChannel(IChannelControlBlock ccb, IApplicationMessage msg) throws IOException {
         byte[] serializedMsg = JavaSerializationUtils.serialize(msg);
         if (serializedMsg.length > maxMsgSize) {
@@ -161,8 +124,8 @@
                     Thread.currentThread().interrupt();
                 } catch (Exception e) {
                     if (LOGGER.isLoggable(Level.WARNING) && msg != null) {
-                        LOGGER.log(Level.WARNING, "Could not process message with id: " + msg.getId() + " and type: "
-                                + msg.type(), e);
+                        LOGGER.log(Level.WARNING, "Could not process message : "
+                                + msg, e);
                     } else {
                         if (LOGGER.isLoggable(Level.WARNING)) {
                             LOGGER.log(Level.WARNING, "Could not process message", e);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 046d5c1..c009152 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -86,8 +86,6 @@
 
     public ILibraryManager getLibraryManager();
 
-    public void initializeResourceIdFactory() throws HyracksDataException;
-
     /**
      * Exports the metadata node to the metadata RMI port.
      *
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 552ce22..04e30ff 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -20,22 +20,22 @@
 
 import java.util.List;
 
-import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
-import org.apache.asterix.common.context.DatasetLifecycleManager.IndexInfo;
+import org.apache.asterix.common.context.DatasetInfo;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IIndex;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 
-public interface IDatasetLifecycleManager extends IIndexLifecycleManager {
+public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IIndex> {
     /**
-     * @param datasetID
-     * @param resourceID
+     * @param datasetId
+     * @param indexId
      * @return The corresponding index, or null if it is not found in the registered indexes.
      * @throws HyracksDataException
      */
-    IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException;
+    IIndex getIndex(int datasetId, long indexId) throws HyracksDataException;
 
     /**
      * Flushes all open datasets synchronously.
@@ -75,7 +75,7 @@
      * @param datasetID
      * @return
      */
-    ILSMOperationTracker getOperationTracker(int datasetID);
+    PrimaryIndexOperationTracker getOperationTracker(int datasetID);
 
     /**
      * creates (if necessary) and returns the dataset virtual buffer caches.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 21500b7..5c1d094 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.common.context;
 
-import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index 3d112ef..70339f3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -27,7 +27,7 @@
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -45,7 +45,7 @@
     private final IDatasetLifecycleManager datasetLifecycleManager;
     private final int datasetID;
 
-    public CorrelatedPrefixMergePolicy(IIndexLifecycleManager datasetLifecycleManager, int datasetID) {
+    public CorrelatedPrefixMergePolicy(IResourceLifecycleManager datasetLifecycleManager, int datasetID) {
         this.datasetLifecycleManager = (DatasetLifecycleManager) datasetLifecycleManager;
         this.datasetID = datasetID;
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
index ce405fc..3b65123 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
@@ -27,7 +27,7 @@
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
@@ -61,7 +61,7 @@
     }
 
     @Override
-    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IIndexLifecycleManager ilcm) {
+    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IResourceLifecycleManager ilcm) {
         ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(ilcm, datasetID);
         policy.configure(properties);
         return policy;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
new file mode 100644
index 0000000..6a2cc56
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
+    private final Map<Long, IndexInfo> indexes;
+    private final int datasetID;
+    private long lastAccess;
+    private int numActiveIOOps;
+    private boolean isExternal;
+    private boolean isRegistered;
+    private boolean memoryAllocated;
+    private boolean durable;
+
+    public DatasetInfo(int datasetID) {
+        this.indexes = new HashMap<>();
+        this.setLastAccess(-1);
+        this.datasetID = datasetID;
+        this.setRegistered(false);
+        this.setMemoryAllocated(false);
+    }
+
+    @Override
+    public void touch() {
+        super.touch();
+        setLastAccess(System.currentTimeMillis());
+    }
+
+    @Override
+    public void untouch() {
+        super.untouch();
+        setLastAccess(System.currentTimeMillis());
+    }
+
+    public synchronized void declareActiveIOOperation() {
+        setNumActiveIOOps(getNumActiveIOOps() + 1);
+    }
+
+    public synchronized void undeclareActiveIOOperation() {
+        setNumActiveIOOps(getNumActiveIOOps() - 1);
+        //notify threads waiting on this dataset info
+        notifyAll();
+    }
+
+    public synchronized Set<ILSMIndex> getDatasetIndexes() {
+        Set<ILSMIndex> datasetIndexes = new HashSet<>();
+        for (IndexInfo iInfo : getIndexes().values()) {
+            if (iInfo.isOpen()) {
+                datasetIndexes.add(iInfo.getIndex());
+            }
+        }
+
+        return datasetIndexes;
+    }
+
+    @Override
+    public int compareTo(DatasetInfo i) {
+        // sort by (isOpen, referenceCount, lastAccess) ascending, where true < false
+        //
+        // Example sort order:
+        // -------------------
+        // (F, 0, 70)       <-- largest
+        // (F, 0, 60)
+        // (T, 10, 80)
+        // (T, 10, 70)
+        // (T, 9, 90)
+        // (T, 0, 100)      <-- smallest
+        if (isOpen() && !i.isOpen()) {
+            return -1;
+        } else if (!isOpen() && i.isOpen()) {
+            return 1;
+        } else {
+            if (getReferenceCount() < i.getReferenceCount()) {
+                return -1;
+            } else if (getReferenceCount() > i.getReferenceCount()) {
+                return 1;
+            } else {
+                if (getLastAccess() < i.getLastAccess()) {
+                    return -1;
+                } else if (getLastAccess() > i.getLastAccess()) {
+                    return 1;
+                } else {
+                    return 0;
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof DatasetInfo) {
+            return datasetID == ((DatasetInfo) obj).datasetID;
+        }
+        return false;
+    };
+
+    @Override
+    public int hashCode() {
+        return datasetID;
+    }
+
+    @Override
+    public String toString() {
+        return "DatasetID: " + getDatasetID() + ", isOpen: " + isOpen() + ", refCount: " + getReferenceCount()
+                + ", lastAccess: " + getLastAccess() + ", isRegistered: " + isRegistered() + ", memoryAllocated: "
+                + isMemoryAllocated() + ", isDurable: " + isDurable();
+    }
+
+    public boolean isDurable() {
+        return durable;
+    }
+
+    public int getNumActiveIOOps() {
+        return numActiveIOOps;
+    }
+
+    public void setNumActiveIOOps(int numActiveIOOps) {
+        this.numActiveIOOps = numActiveIOOps;
+    }
+
+    public boolean isExternal() {
+        return isExternal;
+    }
+
+    public void setExternal(boolean isExternal) {
+        this.isExternal = isExternal;
+    }
+
+    public Map<Long, IndexInfo> getIndexes() {
+        return indexes;
+    }
+
+    public boolean isRegistered() {
+        return isRegistered;
+    }
+
+    public void setRegistered(boolean isRegistered) {
+        this.isRegistered = isRegistered;
+    }
+
+    public void setDurable(boolean durable) {
+        this.durable = durable;
+    }
+
+    public int getDatasetID() {
+        return datasetID;
+    }
+
+    public boolean isMemoryAllocated() {
+        return memoryAllocated;
+    }
+
+    public void setMemoryAllocated(boolean memoryAllocated) {
+        this.memoryAllocated = memoryAllocated;
+    }
+
+    public long getLastAccess() {
+        return lastAccess;
+    }
+
+    public void setLastAccess(long lastAccess) {
+        this.lastAccess = lastAccess;
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index f4eec05..698dfb4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -22,11 +22,9 @@
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
@@ -40,22 +38,16 @@
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
-import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
-import org.apache.hyracks.storage.common.buffercache.ResourceHeapBufferAllocator;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.file.LocalResource;
 
 public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent {
+    private final Map<Integer, DatasetResource> datasets = new ConcurrentHashMap<>();
     private final AsterixStorageProperties storageProperties;
-    private final Map<Integer, DatasetVirtualBufferCaches> datasetVirtualBufferCachesMap;
-    private final Map<Integer, ILSMOperationTracker> datasetOpTrackers;
-    private final Map<Integer, DatasetInfo> datasetInfos;
     private final ILocalResourceRepository resourceRepository;
     private final int firstAvilableUserDatasetID;
     private final long capacity;
@@ -63,7 +55,7 @@
     private final ILogManager logManager;
     private final LogRecord logRecord;
     private final int numPartitions;
-    private boolean stopped = false;
+    private volatile boolean stopped = false;
 
     public DatasetLifecycleManager(AsterixStorageProperties storageProperties,
             ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID, ILogManager logManager,
@@ -73,16 +65,13 @@
         this.resourceRepository = resourceRepository;
         this.firstAvilableUserDatasetID = firstAvilableUserDatasetID;
         this.numPartitions = numPartitions;
-        datasetVirtualBufferCachesMap = new HashMap<>();
-        datasetOpTrackers = new HashMap<Integer, ILSMOperationTracker>();
-        datasetInfos = new HashMap<Integer, DatasetInfo>();
         capacity = storageProperties.getMemoryComponentGlobalBudget();
         used = 0;
         logRecord = new LogRecord();
     }
 
     @Override
-    public synchronized IIndex getIndex(String resourcePath) throws HyracksDataException {
+    public synchronized IIndex get(String resourcePath) throws HyracksDataException {
         validateDatasetLifecycleManagerState();
         int datasetID = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
@@ -92,15 +81,11 @@
     @Override
     public synchronized IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
         validateDatasetLifecycleManagerState();
-        DatasetInfo dsInfo = datasetInfos.get(datasetID);
-        if (dsInfo == null) {
+        DatasetResource datasetResource = datasets.get(datasetID);
+        if (datasetResource == null) {
             return null;
         }
-        IndexInfo iInfo = dsInfo.indexes.get(resourceID);
-        if (iInfo == null) {
-            return null;
-        }
-        return iInfo.index;
+        return datasetResource.getIndex(resourceID);
     }
 
     @Override
@@ -108,20 +93,11 @@
         validateDatasetLifecycleManagerState();
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
-        DatasetInfo dsInfo = datasetInfos.get(did);
-        if (dsInfo == null) {
-            dsInfo = getDatasetInfo(did);
+        DatasetResource datasetResource = datasets.get(did);
+        if (datasetResource == null) {
+            datasetResource = getDatasetLifecycle(did);
         }
-        if (!dsInfo.isRegistered) {
-            dsInfo.isExternal = !index.hasMemoryComponents();
-            dsInfo.isRegistered = true;
-            dsInfo.durable = ((ILSMIndex) index).isDurable();
-        }
-
-        if (dsInfo.indexes.containsKey(resourceID)) {
-            throw new HyracksDataException("Index with resource ID " + resourceID + " already exists.");
-        }
-        dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index, dsInfo.datasetID, resourceID));
+        datasetResource.register(resourceID, index);
     }
 
     public int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
@@ -146,24 +122,25 @@
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
 
-        DatasetInfo dsInfo = datasetInfos.get(did);
-        IndexInfo iInfo = dsInfo == null ? null : dsInfo.indexes.get(resourceID);
+        DatasetResource dsr = datasets.get(did);
+        IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
 
-        if (dsInfo == null || iInfo == null) {
+        if (dsr == null || iInfo == null) {
             throw new HyracksDataException("Index with resource ID " + resourceID + " does not exist.");
         }
 
-        PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) datasetOpTrackers.get(dsInfo.datasetID);
-        if (iInfo.referenceCount != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+        PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
+        if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
             throw new HyracksDataException("Cannot remove index while it is open. (Dataset reference count = "
-                    + iInfo.referenceCount + ", Operation tracker number of active operations = "
+                    + iInfo.getReferenceCount() + ", Operation tracker number of active operations = "
                     + opTracker.getNumActiveOperations() + ")");
         }
 
         // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
         // First wait for any ongoing IO operations
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
         synchronized (dsInfo) {
-            while (dsInfo.numActiveIOOps > 0) {
+            while (dsInfo.getNumActiveIOOps() > 0) {
                 try {
                     //notification will come from DatasetInfo class (undeclareActiveIOOperation)
                     dsInfo.wait();
@@ -173,16 +150,17 @@
             }
         }
 
-        if (iInfo.isOpen) {
-            ILSMOperationTracker indexOpTracker = iInfo.index.getOperationTracker();
+        if (iInfo.isOpen()) {
+            ILSMOperationTracker indexOpTracker = iInfo.getIndex().getOperationTracker();
             synchronized (indexOpTracker) {
-                iInfo.index.deactivate(false);
+                iInfo.getIndex().deactivate(false);
             }
         }
 
-        dsInfo.indexes.remove(resourceID);
-        if (dsInfo.referenceCount == 0 && dsInfo.isOpen && dsInfo.indexes.isEmpty() && !dsInfo.isExternal) {
-            removeDatasetFromCache(dsInfo.datasetID);
+        dsInfo.getIndexes().remove(resourceID);
+        if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
+                && !dsInfo.isExternal()) {
+            removeDatasetFromCache(dsInfo.getDatasetID());
         }
     }
 
@@ -192,29 +170,28 @@
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
 
-        DatasetInfo dsInfo = datasetInfos.get(did);
-        if (dsInfo == null || !dsInfo.isRegistered) {
+        DatasetResource dsr = datasets.get(did);
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
+        if (dsInfo == null || !dsInfo.isRegistered()) {
             throw new HyracksDataException(
                     "Failed to open index with resource ID " + resourceID + " since it does not exist.");
         }
 
-        IndexInfo iInfo = dsInfo.indexes.get(resourceID);
+        IndexInfo iInfo = dsInfo.getIndexes().get(resourceID);
         if (iInfo == null) {
             throw new HyracksDataException(
                     "Failed to open index with resource ID " + resourceID + " since it does not exist.");
         }
-        if (!dsInfo.isOpen && !dsInfo.isExternal) {
-            initializeDatasetVirtualBufferCache(did);
-        }
 
-        dsInfo.isOpen = true;
-        dsInfo.touch();
-        if (!iInfo.isOpen) {
-            ILSMOperationTracker opTracker = iInfo.index.getOperationTracker();
+        dsr.open(true);
+        dsr.touch();
+
+        if (!iInfo.isOpen()) {
+            ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
             synchronized (opTracker) {
-                iInfo.index.activate();
+                iInfo.getIndex().activate();
             }
-            iInfo.isOpen = true;
+            iInfo.setOpen(true);
         }
         iInfo.touch();
     }
@@ -225,14 +202,15 @@
          * that is not being used (refcount == 0) and has been least recently used, excluding metadata datasets.
          * The sort order defined for DatasetInfo maintains this. See DatasetInfo.compareTo().
          */
-        List<DatasetInfo> datasetInfosList = new ArrayList<DatasetInfo>(datasetInfos.values());
-        Collections.sort(datasetInfosList);
-        for (DatasetInfo dsInfo : datasetInfosList) {
-            PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) datasetOpTrackers
-                    .get(dsInfo.datasetID);
-            if (opTracker != null && opTracker.getNumActiveOperations() == 0 && dsInfo.referenceCount == 0
-                    && dsInfo.isOpen && dsInfo.datasetID >= firstAvilableUserDatasetID) {
-                closeDataset(dsInfo);
+        List<DatasetResource> datasetsResources = new ArrayList<>(datasets.values());
+        Collections.sort(datasetsResources);
+        for (DatasetResource dsr : datasetsResources) {
+            PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
+            if (opTracker != null && opTracker.getNumActiveOperations() == 0
+                    && dsr.getDatasetInfo().getReferenceCount() == 0
+                    && dsr.getDatasetInfo().isOpen()
+                    && dsr.getDatasetInfo().getDatasetID() >= getFirstAvilableUserDatasetID()) {
+                closeDataset(dsr.getDatasetInfo());
                 return true;
             }
         }
@@ -240,15 +218,15 @@
     }
 
     private static void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException {
-        if (iInfo.isOpen) {
-            ILSMIndexAccessor accessor = iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
+        if (iInfo.isOpen()) {
+            ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
-            accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
+            accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
         }
 
         // Wait for the above flush op.
         synchronized (dsInfo) {
-            while (dsInfo.numActiveIOOps > 0) {
+            while (dsInfo.getNumActiveIOOps() > 0) {
                 try {
                     //notification will come from DatasetInfo class (undeclareActiveIOOperation)
                     dsInfo.wait();
@@ -259,16 +237,29 @@
         }
     }
 
+    public DatasetResource getDatasetLifecycle(int did) {
+        DatasetResource dsr = datasets.get(did);
+        if (dsr != null) {
+            return dsr;
+        }
+        synchronized (datasets) {
+            dsr = datasets.get(did);
+            if (dsr == null) {
+                DatasetInfo dsInfo = new DatasetInfo(did);
+                PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(did, logManager, dsInfo);
+                DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties,
+                        getFirstAvilableUserDatasetID(),
+                        getNumPartitions());
+                dsr = new DatasetResource(dsInfo, opTracker, vbcs);
+                datasets.put(did, dsr);
+            }
+            return dsr;
+        }
+    }
+
     @Override
     public DatasetInfo getDatasetInfo(int datasetID) {
-        synchronized (datasetInfos) {
-            DatasetInfo dsInfo = datasetInfos.get(datasetID);
-            if (dsInfo == null) {
-                dsInfo = new DatasetInfo(datasetID);
-                datasetInfos.put(datasetID, dsInfo);
-            }
-            return dsInfo;
-        }
+        return getDatasetLifecycle(datasetID).getDatasetInfo();
     }
 
     @Override
@@ -276,35 +267,34 @@
         validateDatasetLifecycleManagerState();
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
-
-        DatasetInfo dsInfo = datasetInfos.get(did);
-        if (dsInfo == null) {
+        DatasetResource dsr = datasets.get(did);
+        if (dsr == null) {
             throw new HyracksDataException("No index found with resourceID " + resourceID);
         }
-        IndexInfo iInfo = dsInfo.indexes.get(resourceID);
+        IndexInfo iInfo = dsr.getIndexInfo(resourceID);
         if (iInfo == null) {
             throw new HyracksDataException("No index found with resourceID " + resourceID);
         }
         iInfo.untouch();
-        dsInfo.untouch();
+        dsr.untouch();
     }
 
     @Override
-    public synchronized List<IIndex> getOpenIndexes() {
+    public synchronized List<IIndex> getOpenResources() {
         List<IndexInfo> openIndexesInfo = getOpenIndexesInfo();
         List<IIndex> openIndexes = new ArrayList<IIndex>();
         for (IndexInfo iInfo : openIndexesInfo) {
-            openIndexes.add(iInfo.index);
+            openIndexes.add(iInfo.getIndex());
         }
         return openIndexes;
     }
 
     @Override
     public synchronized List<IndexInfo> getOpenIndexesInfo() {
-        List<IndexInfo> openIndexesInfo = new ArrayList<IndexInfo>();
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
-            for (IndexInfo iInfo : dsInfo.indexes.values()) {
-                if (iInfo.isOpen) {
+        List<IndexInfo> openIndexesInfo = new ArrayList<>();
+        for (DatasetResource dsr : datasets.values()) {
+            for (IndexInfo iInfo : dsr.getIndexes().values()) {
+                if (iInfo.isOpen()) {
                     openIndexesInfo.add(iInfo);
                 }
             }
@@ -313,46 +303,23 @@
     }
 
     private DatasetVirtualBufferCaches getVirtualBufferCaches(int datasetID) {
-        synchronized (datasetVirtualBufferCachesMap) {
-            DatasetVirtualBufferCaches vbcs = datasetVirtualBufferCachesMap.get(datasetID);
-            if (vbcs == null) {
-                vbcs = initializeDatasetVirtualBufferCache(datasetID);
-            }
-            return vbcs;
-        }
+        return getDatasetLifecycle(datasetID).getVirtualBufferCaches();
     }
 
     @Override
     public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID, int ioDeviceNum) {
         DatasetVirtualBufferCaches dvbcs = getVirtualBufferCaches(datasetID);
-        return dvbcs.getVirtualBufferCaches(ioDeviceNum);
+        return dvbcs.getVirtualBufferCaches(this, ioDeviceNum);
     }
 
     private void removeDatasetFromCache(int datasetID) throws HyracksDataException {
         deallocateDatasetMemory(datasetID);
-        datasetInfos.remove(datasetID);
-        datasetVirtualBufferCachesMap.remove(datasetID);
-        datasetOpTrackers.remove(datasetID);
-    }
-
-    private DatasetVirtualBufferCaches initializeDatasetVirtualBufferCache(int datasetID) {
-        synchronized (datasetVirtualBufferCachesMap) {
-            DatasetVirtualBufferCaches dvbcs = new DatasetVirtualBufferCaches(datasetID);
-            datasetVirtualBufferCachesMap.put(datasetID, dvbcs);
-            return dvbcs;
-        }
+        datasets.remove(datasetID);
     }
 
     @Override
-    public ILSMOperationTracker getOperationTracker(int datasetID) {
-        synchronized (datasetOpTrackers) {
-            ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
-            if (opTracker == null) {
-                opTracker = new PrimaryIndexOperationTracker(datasetID, logManager, getDatasetInfo(datasetID));
-                datasetOpTrackers.put(datasetID, opTracker);
-            }
-            return opTracker;
-        }
+    public PrimaryIndexOperationTracker getOperationTracker(int datasetID) {
+        return datasets.get(datasetID).getOpTracker();
     }
 
     private void validateDatasetLifecycleManagerState() throws HyracksDataException {
@@ -361,145 +328,6 @@
         }
     }
 
-    private static abstract class Info {
-        protected int referenceCount;
-        protected boolean isOpen;
-
-        public Info() {
-            referenceCount = 0;
-            isOpen = false;
-        }
-
-        public void touch() {
-            ++referenceCount;
-        }
-
-        public void untouch() {
-            --referenceCount;
-        }
-    }
-
-    public static class IndexInfo extends Info {
-        private final ILSMIndex index;
-        private final long resourceId;
-        private final int datasetId;
-
-        public IndexInfo(ILSMIndex index, int datasetId, long resourceId) {
-            this.index = index;
-            this.datasetId = datasetId;
-            this.resourceId = resourceId;
-        }
-
-        public ILSMIndex getIndex() {
-            return index;
-        }
-
-        public long getResourceId() {
-            return resourceId;
-        }
-
-        public int getDatasetId() {
-            return datasetId;
-        }
-    }
-
-    public static class DatasetInfo extends Info implements Comparable<DatasetInfo> {
-        private final Map<Long, IndexInfo> indexes;
-        private final int datasetID;
-        private long lastAccess;
-        private int numActiveIOOps;
-        private boolean isExternal;
-        private boolean isRegistered;
-        private boolean memoryAllocated;
-        private boolean durable;
-
-        public DatasetInfo(int datasetID) {
-            this.indexes = new HashMap<Long, IndexInfo>();
-            this.lastAccess = -1;
-            this.datasetID = datasetID;
-            this.isRegistered = false;
-            this.memoryAllocated = false;
-        }
-
-        @Override
-        public void touch() {
-            super.touch();
-            lastAccess = System.currentTimeMillis();
-        }
-
-        @Override
-        public void untouch() {
-            super.untouch();
-            lastAccess = System.currentTimeMillis();
-        }
-
-        public synchronized void declareActiveIOOperation() {
-            numActiveIOOps++;
-        }
-
-        public synchronized void undeclareActiveIOOperation() {
-            numActiveIOOps--;
-            //notify threads waiting on this dataset info
-            notifyAll();
-        }
-
-        public synchronized Set<ILSMIndex> getDatasetIndexes() {
-            Set<ILSMIndex> datasetIndexes = new HashSet<ILSMIndex>();
-            for (IndexInfo iInfo : indexes.values()) {
-                if (iInfo.isOpen) {
-                    datasetIndexes.add(iInfo.index);
-                }
-            }
-
-            return datasetIndexes;
-        }
-
-        @Override
-        public int compareTo(DatasetInfo i) {
-            // sort by (isOpen, referenceCount, lastAccess) ascending, where true < false
-            //
-            // Example sort order:
-            // -------------------
-            // (F, 0, 70)       <-- largest
-            // (F, 0, 60)
-            // (T, 10, 80)
-            // (T, 10, 70)
-            // (T, 9, 90)
-            // (T, 0, 100)      <-- smallest
-            if (isOpen && !i.isOpen) {
-                return -1;
-            } else if (!isOpen && i.isOpen) {
-                return 1;
-            } else {
-                if (referenceCount < i.referenceCount) {
-                    return -1;
-                } else if (referenceCount > i.referenceCount) {
-                    return 1;
-                } else {
-                    if (lastAccess < i.lastAccess) {
-                        return -1;
-                    } else if (lastAccess > i.lastAccess) {
-                        return 1;
-                    } else {
-                        return 0;
-                    }
-                }
-            }
-
-        }
-
-        @Override
-        public String toString() {
-            return "DatasetID: " + datasetID + ", isOpen: " + isOpen + ", refCount: " + referenceCount
-                    + ", lastAccess: " + lastAccess + ", isRegistered: " + isRegistered + ", memoryAllocated: "
-                    + memoryAllocated + ", isDurable: " + durable;
-        }
-
-        public boolean isDurable() {
-            return durable;
-        }
-    }
-
     @Override
     public synchronized void start() {
         used = 0;
@@ -507,30 +335,29 @@
 
     @Override
     public synchronized void flushAllDatasets() throws HyracksDataException {
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
-            flushDatasetOpenIndexes(dsInfo, false);
+        for (DatasetResource dsr : datasets.values()) {
+            flushDatasetOpenIndexes(dsr.getDatasetInfo(), false);
         }
     }
 
     @Override
     public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
-        DatasetInfo datasetInfo = datasetInfos.get(datasetId);
-        if (datasetInfo != null) {
-            flushDatasetOpenIndexes(datasetInfo, asyncFlush);
+        DatasetResource dsr = datasets.get(datasetId);
+        if (dsr != null) {
+            flushDatasetOpenIndexes(dsr.getDatasetInfo(), asyncFlush);
         }
     }
 
     @Override
     public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException {
         //schedule flush for datasets with min LSN (Log Serial Number) < targetLSN
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
-            PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) getOperationTracker(
-                    dsInfo.datasetID);
+        for (DatasetResource dsr : datasets.values()) {
+            PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
             synchronized (opTracker) {
-                for (IndexInfo iInfo : dsInfo.indexes.values()) {
-                    AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.index
+                for (IndexInfo iInfo : dsr.getIndexes().values()) {
+                    AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex()
                             .getIOOperationCallback();
-                    if (!(((AbstractLSMIndex) iInfo.index).isCurrentMutableComponentEmpty()
+                    if (!(((AbstractLSMIndex) iInfo.getIndex()).isCurrentMutableComponentEmpty()
                             || ioCallback.hasPendingFlush() || opTracker.isFlushLogCreated()
                             || opTracker.isFlushOnExit())) {
                         long firstLSN = ioCallback.getFirstLSN();
@@ -552,10 +379,10 @@
      * This method can only be called asynchronously safely if we're sure no modify operation will take place until the flush is scheduled
      */
     private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException {
-        if (!dsInfo.isExternal && dsInfo.durable) {
+        if (!dsInfo.isExternal() && dsInfo.isDurable()) {
             synchronized (logRecord) {
-                TransactionUtil.formFlushLogRecord(logRecord, dsInfo.datasetID, null, logManager.getNodeId(),
-                        dsInfo.indexes.size());
+                TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null, logManager.getNodeId(),
+                        dsInfo.getIndexes().size());
                 try {
                     logManager.log(logRecord);
                 } catch (ACIDException e) {
@@ -569,22 +396,22 @@
                     throw new HyracksDataException(e);
                 }
             }
-            for (IndexInfo iInfo : dsInfo.indexes.values()) {
+            for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
                 //update resource lsn
-                AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) iInfo.index
+                AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex()
                         .getIOOperationCallback();
                 ioOpCallback.updateLastLSN(logRecord.getLSN());
             }
         }
 
         if (asyncFlush) {
-            for (IndexInfo iInfo : dsInfo.indexes.values()) {
-                ILSMIndexAccessor accessor = iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
+            for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
+                ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpOperationCallback.INSTANCE,
                         NoOpOperationCallback.INSTANCE);
-                accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
+                accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
             }
         } else {
-            for (IndexInfo iInfo : dsInfo.indexes.values()) {
+            for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
                 // TODO: This is not efficient since we flush the indexes sequentially.
                 // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this
                 // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
@@ -596,7 +423,7 @@
     private void closeDataset(DatasetInfo dsInfo) throws HyracksDataException {
         // First wait for any ongoing IO operations
         synchronized (dsInfo) {
-            while (dsInfo.numActiveIOOps > 0) {
+            while (dsInfo.getNumActiveIOOps() > 0) {
                 try {
                     dsInfo.wait();
                 } catch (InterruptedException e) {
@@ -609,34 +436,33 @@
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
-        for (IndexInfo iInfo : dsInfo.indexes.values()) {
-            if (iInfo.isOpen) {
-                ILSMOperationTracker opTracker = iInfo.index.getOperationTracker();
+        for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
+            if (iInfo.isOpen()) {
+                ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
                 synchronized (opTracker) {
-                    iInfo.index.deactivate(false);
+                    iInfo.getIndex().deactivate(false);
                 }
-                iInfo.isOpen = false;
+                iInfo.setOpen(false);
             }
-            assert iInfo.referenceCount == 0;
         }
-        removeDatasetFromCache(dsInfo.datasetID);
-        dsInfo.isOpen = false;
+        removeDatasetFromCache(dsInfo.getDatasetID());
+        dsInfo.setOpen(false);
     }
 
     @Override
     public synchronized void closeAllDatasets() throws HyracksDataException {
-        List<DatasetInfo> openDatasets = new ArrayList<>(datasetInfos.values());
-        for (DatasetInfo dsInfo : openDatasets) {
-            closeDataset(dsInfo);
+        ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
+        for (DatasetResource dsr : openDatasets) {
+            closeDataset(dsr.getDatasetInfo());
         }
     }
 
     @Override
     public synchronized void closeUserDatasets() throws HyracksDataException {
-        List<DatasetInfo> openDatasets = new ArrayList<>(datasetInfos.values());
-        for (DatasetInfo dsInfo : openDatasets) {
-            if (dsInfo.datasetID >= firstAvilableUserDatasetID) {
-                closeDataset(dsInfo);
+        ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
+        for (DatasetResource dsr : openDatasets) {
+            if (dsr.getDatasetID() >= getFirstAvilableUserDatasetID()) {
+                closeDataset(dsr.getDatasetInfo());
             }
         }
     }
@@ -652,9 +478,7 @@
 
         closeAllDatasets();
 
-        datasetVirtualBufferCachesMap.clear();
-        datasetOpTrackers.clear();
-        datasetInfos.clear();
+        datasets.clear();
         stopped = true;
     }
 
@@ -673,57 +497,43 @@
 
         sb.append("[Datasets]\n");
         sb.append(String.format(dsHeaderFormat, "DatasetID", "Open", "Reference Count", "Last Access"));
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
+        for (DatasetResource dsr : datasets.values()) {
+            DatasetInfo dsInfo = dsr.getDatasetInfo();
             sb.append(
-                    String.format(dsFormat, dsInfo.datasetID, dsInfo.isOpen, dsInfo.referenceCount, dsInfo.lastAccess));
+                    String.format(dsFormat, dsInfo.getDatasetID(), dsInfo.isOpen(), dsInfo.getReferenceCount(),
+                            dsInfo.getLastAccess()));
         }
         sb.append("\n");
 
         sb.append("[Indexes]\n");
         sb.append(String.format(idxHeaderFormat, "DatasetID", "ResourceID", "Open", "Reference Count", "Index"));
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
-            for (Map.Entry<Long, IndexInfo> entry : dsInfo.indexes.entrySet()) {
+        for (DatasetResource dsr : datasets.values()) {
+            DatasetInfo dsInfo = dsr.getDatasetInfo();
+            for (Map.Entry<Long, IndexInfo> entry : dsInfo.getIndexes().entrySet()) {
                 IndexInfo iInfo = entry.getValue();
-                sb.append(String.format(idxFormat, dsInfo.datasetID, entry.getKey(), iInfo.isOpen, iInfo.referenceCount,
-                        iInfo.index));
+                sb.append(String.format(idxFormat, dsInfo.getDatasetID(), entry.getKey(), iInfo.isOpen(),
+                        iInfo.getReferenceCount(),
+                        iInfo.getIndex()));
             }
         }
-
         outputStream.write(sb.toString().getBytes());
     }
 
-    private synchronized void allocateDatasetMemory(int datasetId) throws HyracksDataException {
-        DatasetInfo dsInfo = datasetInfos.get(datasetId);
-        if (dsInfo == null) {
+    private synchronized void deallocateDatasetMemory(int datasetId) throws HyracksDataException {
+        DatasetResource dsr = datasets.get(datasetId);
+        if (dsr == null) {
             throw new HyracksDataException(
                     "Failed to allocate memory for dataset with ID " + datasetId + " since it is not open.");
         }
-        synchronized (dsInfo) {
-            // This is not needed for external datasets' indexes since they never use the virtual buffer cache.
-            if (!dsInfo.memoryAllocated && !dsInfo.isExternal) {
-                long additionalSize = getVirtualBufferCaches(dsInfo.datasetID).getTotalSize();
-                while (used + additionalSize > capacity) {
-                    if (!evictCandidateDataset()) {
-                        throw new HyracksDataException("Cannot allocate dataset " + dsInfo.datasetID
-                                + " memory since memory budget would be exceeded.");
-                    }
-                }
-                used += additionalSize;
-                dsInfo.memoryAllocated = true;
-            }
-        }
-    }
-
-    private synchronized void deallocateDatasetMemory(int datasetId) throws HyracksDataException {
-        DatasetInfo dsInfo = datasetInfos.get(datasetId);
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
         if (dsInfo == null) {
             throw new HyracksDataException(
                     "Failed to deallocate memory for dataset with ID " + datasetId + " since it is not open.");
         }
         synchronized (dsInfo) {
-            if (dsInfo.isOpen && dsInfo.memoryAllocated) {
-                used -= getVirtualBufferCaches(dsInfo.datasetID).getTotalSize();
-                dsInfo.memoryAllocated = false;
+            if (dsInfo.isOpen() && dsInfo.isMemoryAllocated()) {
+                used -= getVirtualBufferCaches(dsInfo.getDatasetID()).getTotalSize();
+                dsInfo.setMemoryAllocated(false);
             }
         }
     }
@@ -731,54 +541,34 @@
     @Override
     public synchronized void allocateMemory(String resourcePath) throws HyracksDataException {
         //a resource name in the case of DatasetLifecycleManager is a dataset id which is passed to the ResourceHeapBufferAllocator.
-        int did = Integer.parseInt(resourcePath);
-        allocateDatasetMemory(did);
-    }
-
-    private class DatasetVirtualBufferCaches {
-        private final int datasetID;
-        private final Map<Integer, List<IVirtualBufferCache>> ioDeviceVirtualBufferCaches = new HashMap<>();
-
-        public DatasetVirtualBufferCaches(int datasetID) {
-            this.datasetID = datasetID;
+        int datasetId = Integer.parseInt(resourcePath);
+        DatasetResource dsr = datasets.get(datasetId);
+        if (dsr == null) {
+            throw new HyracksDataException(
+                    "Failed to allocate memory for dataset with ID " + datasetId + " since it is not open.");
         }
-
-        private List<IVirtualBufferCache> initializeVirtualBufferCaches(int ioDeviceNum) {
-            assert ioDeviceVirtualBufferCaches.size() < numPartitions;
-            int numPages = datasetID < firstAvilableUserDatasetID
-                    ? storageProperties.getMetadataMemoryComponentNumPages()
-                    : storageProperties.getMemoryComponentNumPages();
-            List<IVirtualBufferCache> vbcs = new ArrayList<>();
-            for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
-                MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
-                        new VirtualBufferCache(
-                                new ResourceHeapBufferAllocator(DatasetLifecycleManager.this,
-                                        Integer.toString(datasetID)),
-                                storageProperties.getMemoryComponentPageSize(),
-                                numPages / storageProperties.getMemoryComponentsNum() / numPartitions));
-                vbcs.add(vbc);
-            }
-            ioDeviceVirtualBufferCaches.put(ioDeviceNum, vbcs);
-            return vbcs;
-        }
-
-        public List<IVirtualBufferCache> getVirtualBufferCaches(int ioDeviceNum) {
-            synchronized (ioDeviceVirtualBufferCaches) {
-                List<IVirtualBufferCache> vbcs = ioDeviceVirtualBufferCaches.get(ioDeviceNum);
-                if (vbcs == null) {
-                    vbcs = initializeVirtualBufferCaches(ioDeviceNum);
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
+        synchronized (dsInfo) {
+            // This is not needed for external datasets' indexes since they never use the virtual buffer cache.
+            if (!dsInfo.isMemoryAllocated() && !dsInfo.isExternal()) {
+                long additionalSize = getVirtualBufferCaches(dsInfo.getDatasetID()).getTotalSize();
+                while (used + additionalSize > capacity) {
+                    if (!evictCandidateDataset()) {
+                        throw new HyracksDataException("Cannot allocate dataset " + dsInfo.getDatasetID()
+                                + " memory since memory budget would be exceeded.");
+                    }
                 }
-                return vbcs;
+                used += additionalSize;
+                dsInfo.setMemoryAllocated(true);
             }
         }
-
-        public long getTotalSize() {
-            int numPages = datasetID < firstAvilableUserDatasetID
-                    ? storageProperties.getMetadataMemoryComponentNumPages()
-                    : storageProperties.getMemoryComponentNumPages();
-
-            return storageProperties.getMemoryComponentPageSize() * numPages;
-        }
     }
 
+    public int getFirstAvilableUserDatasetID() {
+        return firstAvilableUserDatasetID;
+    }
+
+    public int getNumPartitions() {
+        return numPartitions;
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
new file mode 100644
index 0000000..01f2e2b
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+/**
+ * A dataset can be in one of two states { EVICTED , LOADED }.
+ * When a dataset is created, it is in the EVICTED state. In the EVICTED state, the allowed operations are:
+ * 1. DELETE: delete the dataset completely.
+ * 2. LOAD: move the dataset to the LOADED state.
+ * When a dataset is in the LOADED state, the allowed operations are:
+ * 1. OPEN: increment the open counter.
+ * 2. ALLOCATE RESOURCES (memory)
+ * 3. DEALLOCATE RESOURCES (memory)
+ * 4. CLOSE: decrement the open counter.
+ * 5. EVICT: deallocate resources and unload the dataset moving it to the EVICTED state
+ */
+public class DatasetResource implements Comparable<DatasetResource> {
+    private final DatasetInfo datasetInfo;
+    private final PrimaryIndexOperationTracker datasetPrimaryOpTracker;
+    private final DatasetVirtualBufferCaches datasetVirtualBufferCaches;
+
+    public DatasetResource(DatasetInfo datasetInfo,
+            PrimaryIndexOperationTracker datasetPrimaryOpTracker,
+            DatasetVirtualBufferCaches datasetVirtualBufferCaches) {
+        this.datasetInfo = datasetInfo;
+        this.datasetPrimaryOpTracker = datasetPrimaryOpTracker;
+        this.datasetVirtualBufferCaches = datasetVirtualBufferCaches;
+    }
+
+    public boolean isRegistered() {
+        return datasetInfo.isRegistered();
+    }
+
+    public IndexInfo getIndexInfo(long resourceID) {
+        return datasetInfo.getIndexes().get(resourceID);
+    }
+
+    public boolean isOpen() {
+        return datasetInfo.isOpen();
+    }
+
+    public boolean isExternal() {
+        return datasetInfo.isExternal();
+    }
+
+    public void open(boolean open) {
+        datasetInfo.setOpen(open);
+    }
+
+    public void touch() {
+        datasetInfo.touch();
+    }
+
+    public void untouch() {
+        datasetInfo.untouch();
+    }
+
+    public DatasetVirtualBufferCaches getVirtualBufferCaches() {
+        return datasetVirtualBufferCaches;
+    }
+
+    public IIndex getIndex(long resourceID) {
+        IndexInfo iInfo = getIndexInfo(resourceID);
+        if (iInfo == null) {
+            return null;
+        }
+        return iInfo.getIndex();
+    }
+
+    public void register(long resourceID, IIndex index) throws HyracksDataException {
+        if (!datasetInfo.isRegistered()) {
+            synchronized (datasetInfo) {
+                if (!datasetInfo.isRegistered()) {
+                    datasetInfo.setExternal(!index.hasMemoryComponents());
+                    datasetInfo.setRegistered(true);
+                    datasetInfo.setDurable(((ILSMIndex) index).isDurable());
+                }
+            }
+        }
+        if (datasetInfo.getIndexes().containsKey(resourceID)) {
+            throw new HyracksDataException("Index with resource ID " + resourceID + " already exists.");
+        }
+        datasetInfo.getIndexes().put(resourceID,
+                new IndexInfo((ILSMIndex) index, datasetInfo.getDatasetID(), resourceID));
+    }
+
+    public DatasetInfo getDatasetInfo() {
+        return datasetInfo;
+    }
+
+    public PrimaryIndexOperationTracker getOpTracker() {
+        return datasetPrimaryOpTracker;
+    }
+
+    @Override
+    public int compareTo(DatasetResource o) {
+        return datasetInfo.compareTo(o.datasetInfo);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof DatasetResource) {
+            return datasetInfo.equals(((DatasetResource) obj).datasetInfo);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return datasetInfo.hashCode();
+    }
+
+    public Map<Long, IndexInfo> getIndexes() {
+        return datasetInfo.getIndexes();
+    }
+
+    public int getDatasetID() {
+        return datasetInfo.getDatasetID();
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
new file mode 100644
index 0000000..df85509
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.AsterixStorageProperties;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
+import org.apache.hyracks.storage.common.IResourceMemoryManager;
+import org.apache.hyracks.storage.common.buffercache.ResourceHeapBufferAllocator;
+
+public class DatasetVirtualBufferCaches {
+    private final int datasetID;
+    private final AsterixStorageProperties storageProperties;
+    private final int firstAvilableUserDatasetID;
+    private final int numPartitions;
+    private final Map<Integer, List<IVirtualBufferCache>> ioDeviceVirtualBufferCaches = new HashMap<>();
+
+    public DatasetVirtualBufferCaches(int datasetID, AsterixStorageProperties storageProperties,
+            int firstAvilableUserDatasetID, int numPartitions) {
+        this.datasetID = datasetID;
+        this.storageProperties = storageProperties;
+        this.firstAvilableUserDatasetID = firstAvilableUserDatasetID;
+        this.numPartitions = numPartitions;
+    }
+
+    public List<IVirtualBufferCache> initializeVirtualBufferCaches(IResourceMemoryManager memoryManager,
+            int ioDeviceNum) {
+        int numPages = datasetID < firstAvilableUserDatasetID
+                ? storageProperties.getMetadataMemoryComponentNumPages()
+                : storageProperties.getMemoryComponentNumPages();
+        List<IVirtualBufferCache> vbcs = new ArrayList<>();
+        for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
+            MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
+                    new VirtualBufferCache(
+                            new ResourceHeapBufferAllocator(memoryManager,
+                                    Integer.toString(datasetID)),
+                            storageProperties.getMemoryComponentPageSize(),
+                            numPages / storageProperties.getMemoryComponentsNum() / numPartitions));
+            vbcs.add(vbc);
+        }
+        ioDeviceVirtualBufferCaches.put(ioDeviceNum, vbcs);
+        return vbcs;
+    }
+
+    public List<IVirtualBufferCache> getVirtualBufferCaches(IResourceMemoryManager memoryManager, int ioDeviceNum) {
+        synchronized (ioDeviceVirtualBufferCaches) {
+            List<IVirtualBufferCache> vbcs = ioDeviceVirtualBufferCaches.get(ioDeviceNum);
+            if (vbcs == null) {
+                vbcs = initializeVirtualBufferCaches(memoryManager, ioDeviceNum);
+            }
+            return vbcs;
+        }
+    }
+
+    public long getTotalSize() {
+        int numPages = datasetID < firstAvilableUserDatasetID
+                ? storageProperties.getMetadataMemoryComponentNumPages()
+                : storageProperties.getMemoryComponentNumPages();
+        return storageProperties.getMemoryComponentPageSize() * ((long) numPages);
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
new file mode 100644
index 0000000..34ccce0
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class IndexInfo extends Info {
+    private final ILSMIndex index;
+    private final long resourceId;
+    private final int datasetId;
+
+    public IndexInfo(ILSMIndex index, int datasetId, long resourceId) {
+        this.index = index;
+        this.datasetId = datasetId;
+        this.resourceId = resourceId;
+    }
+
+    public ILSMIndex getIndex() {
+        return index;
+    }
+
+    public long getResourceId() {
+        return resourceId;
+    }
+
+    public int getDatasetId() {
+        return datasetId;
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/Info.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/Info.java
new file mode 100644
index 0000000..999eb34
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/Info.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+public abstract class Info {
+    private int referenceCount;
+    private boolean isOpen;
+
+    public Info() {
+        referenceCount = 0;
+        isOpen = false;
+    }
+
+    public void touch() {
+        ++referenceCount;
+    }
+
+    public void untouch() {
+        --referenceCount;
+    }
+
+    public int getReferenceCount() {
+        return referenceCount;
+    }
+
+    public void setReferenceCount(int referenceCount) {
+        this.referenceCount = referenceCount;
+    }
+
+    public boolean isOpen() {
+        return isOpen;
+    }
+
+    public void setOpen(boolean isOpen) {
+        this.isOpen = isOpen;
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 71c30d5..6350d73 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -22,7 +22,6 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
deleted file mode 100644
index 5737957..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
-
-public abstract class AbstractApplicationMessage implements IApplicationMessage {
-    private static final long serialVersionUID = 1L;
-    protected long id;
-
-    @Override
-    public void setId(long id) {
-        this.id = id;
-    }
-
-    @Override
-    public long getId() {
-        return id;
-    }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index b93082d..dbd2139 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -22,31 +22,11 @@
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.service.IControllerService;
 
+@FunctionalInterface
 public interface IApplicationMessage extends IMessage {
 
     /**
-     * Sets a unique message id that identifies this message within an NC.
-     * This id is set by {@link INCMessageBroker#sendMessageToCC(IApplicationMessage, IApplicationMessageCallback)}
-     * when the callback is not null to notify the sender when the response to that message is received.
-     *
-     * @param messageId
-     */
-    public void setId(long messageId);
-
-    /**
-     * @return The unique message id if it has been set, otherwise 0.
-     */
-    public long getId();
-
-    /**
      * handle the message upon delivery
      */
-    public void handle(IControllerService cs) throws HyracksDataException;
-
-    /**
-     * get a string representation for the message type
-     *
-     * @return
-     */
-    public String type();
+    public void handle(IControllerService cs) throws HyracksDataException, InterruptedException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java
deleted file mode 100644
index 3bad5fb..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessageCallback.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging.api;
-
-public interface IApplicationMessageCallback {
-
-    /**
-     * Notifies the message sender when the response has been received.
-     *
-     * @param message
-     *            The response message
-     */
-    public void deliverMessageResponse(IApplicationMessage message);
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
index 2290b93..ced2b6d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -21,7 +21,6 @@
 import org.apache.hyracks.api.messages.IMessageBroker;
 
 public interface ICCMessageBroker extends IMessageBroker {
-    public static final long NO_CALLBACK_MESSAGE_ID = -1;
 
     /**
      * Sends the passed message to the specified {@code nodeId}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index f01d0c3..707f864 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -29,7 +29,7 @@
      * @param callback
      * @throws Exception
      */
-    public void sendMessageToCC(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception;
+    public void sendMessageToCC(IApplicationMessage message) throws Exception;
 
     /**
      * Sends application message from this NC to another NC.
@@ -38,7 +38,7 @@
      * @param callback
      * @throws Exception
      */
-    public void sendMessageToNC(String nodeId, IApplicationMessage message, IApplicationMessageCallback callback)
+    public void sendMessageToNC(String nodeId, IApplicationMessage message)
             throws Exception;
 
     /**
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index d0dd3fa..b66bcf8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -27,7 +27,6 @@
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 public interface IAsterixAppRuntimeContextProvider {
 
@@ -49,8 +48,6 @@
 
     public ILocalResourceRepository getLocalResourceRepository();
 
-    public IResourceIdFactory getResourceIdFactory();
-
     public IIOManager getIOManager();
 
     public IAsterixAppRuntimeContext getAppContext();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index c929b41..6de796e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -407,7 +407,7 @@
             throws ACIDException, HyracksDataException, IndexException {
         long resourceID = metadataIndex.getResourceID();
         String resourceName = metadataIndex.getFile().toString();
-        ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.getIndex(resourceName);
+        ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.get(resourceName);
         try {
             datasetLifecycleManager.open(resourceName);
 
@@ -674,7 +674,7 @@
             throws ACIDException, HyracksDataException, IndexException {
         long resourceID = metadataIndex.getResourceID();
         String resourceName = metadataIndex.getFile().toString();
-        ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.getIndex(resourceName);
+        ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.get(resourceName);
         try {
             datasetLifecycleManager.open(resourceName);
             // prepare a Callback for logging
@@ -1066,7 +1066,7 @@
         try {
             IMetadataIndex index = MetadataPrimaryIndexes.DATAVERSE_DATASET;
             String resourceName = index.getFile().toString();
-            IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
+            IIndex indexInstance = datasetLifecycleManager.get(resourceName);
             datasetLifecycleManager.open(resourceName);
             IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
@@ -1088,7 +1088,7 @@
             datasetLifecycleManager.close(resourceName);
 
             index = MetadataPrimaryIndexes.DATASET_DATASET;
-            indexInstance = datasetLifecycleManager.getIndex(resourceName);
+            indexInstance = datasetLifecycleManager.get(resourceName);
             datasetLifecycleManager.open(resourceName);
             indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
@@ -1113,7 +1113,7 @@
             datasetLifecycleManager.close(resourceName);
 
             index = MetadataPrimaryIndexes.INDEX_DATASET;
-            indexInstance = datasetLifecycleManager.getIndex(resourceName);
+            indexInstance = datasetLifecycleManager.get(resourceName);
             datasetLifecycleManager.open(resourceName);
             indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
@@ -1149,7 +1149,7 @@
             throw new MetadataException("No file for Index " + index.getDataverseName() + "." + index.getIndexName());
         }
         String resourceName = index.getFile().toString();
-        IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
+        IIndex indexInstance = datasetLifecycleManager.get(resourceName);
         datasetLifecycleManager.open(resourceName);
         IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
                 NoOpOperationCallback.INSTANCE);
@@ -1187,7 +1187,7 @@
         int mostRecentDatasetId = MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID;
         try {
             String resourceName = MetadataPrimaryIndexes.DATASET_DATASET.getFile().toString();
-            IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
+            IIndex indexInstance = datasetLifecycleManager.get(resourceName);
             datasetLifecycleManager.open(resourceName);
             try {
                 IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 2629fea..55b1045 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -32,9 +32,9 @@
 import org.apache.asterix.common.api.ILocalResourceMetadata;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.context.BaseOperationTracker;
 import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
@@ -59,10 +59,10 @@
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.entities.Node;
-import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails.FileStructure;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.metadata.entities.Node;
+import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.om.types.BuiltinType;
@@ -319,6 +319,8 @@
         String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath();
         FileReference file = ioManager.getAbsoluteFileRef(metadataDeviceId, resourceName);
 
+        // this should not be done this way. dataset lifecycle manager shouldn't return virtual buffer caches for
+        // a dataset that was not yet created
         List<IVirtualBufferCache> virtualBufferCaches = runtimeContext.getDatasetLifecycleManager()
                 .getVirtualBufferCaches(index.getDatasetId().getId(), metadataPartition.getIODeviceNum());
         ITypeTraits[] typeTraits = index.getTypeTraits();
@@ -360,7 +362,7 @@
                         + " to intialize as a new instance. (WARNING: all data will be lost.)");
             }
             resourceID = resource.getResourceId();
-            lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(absolutePath);
+            lsmBtree = (LSMBTree) dataLifecycleManager.get(absolutePath);
             if (lsmBtree == null) {
                 lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider,
                         typeTraits, comparatorFactories, bloomFilterKeyFields,
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 5a88a9f..c0d8320 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -46,7 +46,7 @@
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
-import org.apache.asterix.common.context.DatasetLifecycleManager.IndexInfo;
+import org.apache.asterix.common.context.IndexInfo;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
@@ -59,7 +59,6 @@
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.asterix.replication.functions.ReplicaFilesRequest;
 import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
@@ -70,6 +69,7 @@
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
index 2eb1be9..7c386d5 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
@@ -18,9 +18,9 @@
  */
 package org.apache.asterix.runtime.message;
 
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 
-public abstract class AbstractFailbackPlanMessage extends AbstractApplicationMessage {
+public abstract class AbstractFailbackPlanMessage implements IApplicationMessage {
 
     private static final long serialVersionUID = 1L;
     protected final long planId;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
index c96bcb8..5cb1a6a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
@@ -55,7 +55,8 @@
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("Plan ID: " + planId);
+        sb.append(CompleteFailbackRequestMessage.class.getSimpleName());
+        sb.append(" Plan ID: " + planId);
         sb.append(" Node ID: " + nodeId);
         sb.append(" Partitions: " + partitions);
         return sb.toString();
@@ -78,7 +79,7 @@
             CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(planId,
                     requestId, partitions);
             try {
-                broker.sendMessageToCC(reponse, null);
+                broker.sendMessageToCC(reponse);
             } catch (Exception e) {
                 LOGGER.log(Level.SEVERE, "Failure sending message to CC", e);
                 hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
@@ -88,9 +89,4 @@
             throw hde;
         }
     }
-
-    @Override
-    public String type() {
-        return "COMPLETE_FAILBACK_REQUEST";
-    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
index 3c4b58c..73e5fd2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
@@ -41,7 +41,8 @@
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("Plan ID: " + planId);
+        sb.append(CompleteFailbackResponseMessage.class.getSimpleName());
+        sb.append(" Plan ID: " + planId);
         sb.append(" Partitions: " + partitions);
         return sb.toString();
     }
@@ -50,9 +51,4 @@
     public void handle(IControllerService cs) throws HyracksDataException {
         AsterixClusterProperties.INSTANCE.processCompleteFailbackResponse(this);
     }
-
-    @Override
-    public String type() {
-        return "COMPLETE_FAILBACK_RESPONSE";
-    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
index e3b9fbe..c112366 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
@@ -64,7 +64,8 @@
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("Plan ID: " + planId);
+        sb.append(PreparePartitionsFailbackRequestMessage.class.getSimpleName());
+        sb.append(" Plan ID: " + planId);
         sb.append(" Partitions: " + partitions);
         sb.append(" releaseMetadataNode: " + releaseMetadataNode);
         return sb.toString();
@@ -109,15 +110,10 @@
         PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(planId,
                 requestId, partitions);
         try {
-            broker.sendMessageToCC(reponse, null);
+            broker.sendMessageToCC(reponse);
         } catch (Exception e) {
             LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
             throw ExceptionUtils.convertToHyracksDataException(e);
         }
     }
-
-    @Override
-    public String type() {
-        return "PREPARE_PARTITIONS_FAILBACK_REQUEST";
-    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
index 2e52773..299121a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
@@ -44,7 +44,7 @@
     }
 
     @Override
-    public String type() {
-        return "PREPARE_PARTITIONS_FAILBACK_RESPONSE";
+    public String toString() {
+        return PreparePartitionsFailbackResponseMessage.class.getSimpleName() + " " + partitions.toString();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
index 2aa4746..fc55968 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
@@ -19,7 +19,7 @@
 package org.apache.asterix.runtime.message;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.replication.ReplicaEvent;
 import org.apache.asterix.event.schema.cluster.Node;
@@ -28,7 +28,7 @@
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class ReplicaEventMessage extends AbstractApplicationMessage {
+public class ReplicaEventMessage implements IApplicationMessage {
 
     private static final long serialVersionUID = 1L;
     private final String nodeId;
@@ -66,7 +66,7 @@
     }
 
     @Override
-    public String type() {
-        return "REPLICA_EVENT";
+    public String toString() {
+        return ReplicaEventMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
index 1604e29..f9f6233 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
@@ -23,7 +23,7 @@
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
@@ -32,7 +32,7 @@
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class ReportMaxResourceIdMessage extends AbstractApplicationMessage {
+public class ReportMaxResourceIdMessage implements IApplicationMessage {
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = Logger.getLogger(ReportMaxResourceIdMessage.class.getName());
     private final long maxResourceId;
@@ -62,7 +62,7 @@
                 MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
         ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage(ncs.getId(), maxResourceId);
         try {
-            ((INCMessageBroker) ncs.getApplicationContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg, null);
+            ((INCMessageBroker) ncs.getApplicationContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg);
         } catch (Exception e) {
             LOGGER.log(Level.SEVERE, "Unable to report max local resource id", e);
             throw ExceptionUtils.convertToHyracksDataException(e);
@@ -70,7 +70,7 @@
     }
 
     @Override
-    public String type() {
-        return "REPORT_MAX_RESOURCE_ID_RESPONSE";
+    public String toString() {
+        return ReportMaxResourceIdMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
index 3e2becf..203104e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
@@ -18,12 +18,12 @@
  */
 package org.apache.asterix.runtime.message;
 
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class ReportMaxResourceIdRequestMessage extends AbstractApplicationMessage {
+public class ReportMaxResourceIdRequestMessage implements IApplicationMessage {
     private static final long serialVersionUID = 1L;
 
     @Override
@@ -32,7 +32,7 @@
     }
 
     @Override
-    public String type() {
-        return "REPORT_MAX_RESOURCE_ID_REQUEST";
+    public String toString() {
+        return ReportMaxResourceIdRequestMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 5e7d808..32d3f64 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -21,7 +21,7 @@
 import java.util.Set;
 
 import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
 import org.apache.asterix.runtime.util.AsterixAppContextInfo;
@@ -29,7 +29,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
-public class ResourceIdRequestMessage extends AbstractApplicationMessage {
+public class ResourceIdRequestMessage implements IApplicationMessage {
     private static final long serialVersionUID = 1L;
     private final String src;
 
@@ -43,7 +43,6 @@
             ICCMessageBroker broker =
                     (ICCMessageBroker) AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();
             ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage();
-            reponse.setId(id);
             if (!AsterixClusterProperties.INSTANCE.isClusterActive()) {
                 reponse.setResourceId(-1);
                 reponse.setException(new Exception("Cannot generate global resource id when cluster is not active."));
@@ -66,7 +65,6 @@
             throws Exception {
         Set<String> getParticipantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
         ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage();
-        msg.setId(ICCMessageBroker.NO_CALLBACK_MESSAGE_ID);
         for (String nodeId : getParticipantNodes) {
             if (!resourceIdManager.reported(nodeId)) {
                 broker.sendApplicationMessageToNC(msg, nodeId);
@@ -75,7 +73,7 @@
     }
 
     @Override
-    public String type() {
-        return "RESOURCE_ID_REQUEST";
+    public String toString() {
+        return ReportMaxResourceIdRequestMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
index 62c2163..d4cc022 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
@@ -18,11 +18,14 @@
  */
 package org.apache.asterix.runtime.message;
 
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.runtime.transaction.GlobalResourceIdFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class ResourceIdRequestResponseMessage extends AbstractApplicationMessage {
+public class ResourceIdRequestResponseMessage implements IApplicationMessage {
     private static final long serialVersionUID = 1L;
 
     private long resourceId;
@@ -45,13 +48,15 @@
     }
 
     @Override
-    public void handle(IControllerService cs) throws HyracksDataException {
-        // Do nothing. for this message, the callback handles it, we probably should get rid of callbacks and
-        // instead, use the handle in the response to perform callback action
+    public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAsterixAppRuntimeContext asterixNcAppRuntimeCtx =
+                (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        ((GlobalResourceIdFactory) asterixNcAppRuntimeCtx.getResourceIdFactory()).addNewIds(this);
     }
 
     @Override
-    public String type() {
-        return "RESOURCE_ID_RESPONSE";
+    public String toString() {
+        return ResourceIdRequestResponseMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
index b18a879..e877f52 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
@@ -23,13 +23,13 @@
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessage {
+public class TakeoverMetadataNodeRequestMessage implements IApplicationMessage {
 
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = Logger.getLogger(TakeoverMetadataNodeRequestMessage.class.getName());
@@ -51,7 +51,7 @@
             TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
                     appContext.getTransactionSubsystem().getId());
             try {
-                broker.sendMessageToCC(reponse, null);
+                broker.sendMessageToCC(reponse);
             } catch (Exception e) {
                 LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
                 hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
@@ -63,7 +63,7 @@
     }
 
     @Override
-    public String type() {
-        return "TAKEOVER_METADATA_NODE_REQUEST";
+    public String toString() {
+        return TakeoverMetadataNodeRequestMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
index f7016bc..e18fc8d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
@@ -18,12 +18,12 @@
  */
 package org.apache.asterix.runtime.message;
 
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
-public class TakeoverMetadataNodeResponseMessage extends AbstractApplicationMessage {
+public class TakeoverMetadataNodeResponseMessage implements IApplicationMessage {
 
     private static final long serialVersionUID = 1L;
     private final String nodeId;
@@ -42,7 +42,7 @@
     }
 
     @Override
-    public String type() {
-        return "TAKEOVER_METADATA_NODE_RESPONSE";
+    public String toString() {
+        return TakeoverMetadataNodeResponseMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
index 3b91084..e024eed 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
@@ -25,14 +25,14 @@
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage {
+public class TakeoverPartitionsRequestMessage implements IApplicationMessage {
 
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = Logger.getLogger(TakeoverPartitionsRequestMessage.class.getName());
@@ -61,7 +61,8 @@
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("Request ID: " + requestId);
+        sb.append(TakeoverPartitionsRequestMessage.class.getSimpleName());
+        sb.append(" Request ID: " + requestId);
         sb.append(" Node ID: " + nodeId);
         sb.append(" Partitions: ");
         for (Integer partitionId : partitions) {
@@ -92,7 +93,7 @@
                 TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(requestId,
                         appContext.getTransactionSubsystem().getId(), partitions);
                 try {
-                    broker.sendMessageToCC(reponse, null);
+                    broker.sendMessageToCC(reponse);
                 } catch (Exception e) {
                     LOGGER.log(Level.SEVERE, "Failure taking over partitions", e);
                     hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
@@ -103,9 +104,4 @@
             }
         }
     }
-
-    @Override
-    public String type() {
-        return "TAKEOVER_PARTITIONS_REQUEST";
-    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
index 9ec71b7..1c57a2a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
@@ -18,12 +18,12 @@
  */
 package org.apache.asterix.runtime.message;
 
-import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
-public class TakeoverPartitionsResponseMessage extends AbstractApplicationMessage {
+public class TakeoverPartitionsResponseMessage implements IApplicationMessage {
 
     private static final long serialVersionUID = 1L;
     private final Integer[] partitions;
@@ -54,7 +54,7 @@
     }
 
     @Override
-    public String type() {
-        return "TAKEOVER_PARTITIONS_RESPONSE";
+    public String toString() {
+        return TakeoverPartitionsResponseMessage.class.getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index 4cb65c2..a8b61f7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -20,8 +20,6 @@
 
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.runtime.message.ResourceIdRequestMessage;
 import org.apache.asterix.runtime.message.ResourceIdRequestResponseMessage;
@@ -34,10 +32,10 @@
  * A resource id factory that generates unique resource ids across all NCs by requesting
  * unique ids from the cluster controller.
  */
-public class GlobalResourceIdFactory implements IResourceIdFactory, IApplicationMessageCallback {
+public class GlobalResourceIdFactory implements IResourceIdFactory {
 
     private final IApplicationContext appCtx;
-    private final LinkedBlockingQueue<IApplicationMessage> resourceIdResponseQ;
+    private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> resourceIdResponseQ;
     private final String nodeId;
 
     public GlobalResourceIdFactory(IApplicationContext appCtx) {
@@ -46,6 +44,10 @@
         this.nodeId = ((NodeControllerService) appCtx.getControllerService()).getApplicationContext().getNodeId();
     }
 
+    public void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse) throws InterruptedException {
+        resourceIdResponseQ.put(resourceIdResponse);
+    }
+
     @Override
     public long createId() throws HyracksDataException {
         try {
@@ -54,15 +56,15 @@
             if (!resourceIdResponseQ.isEmpty()) {
                 synchronized (resourceIdResponseQ) {
                     if (!resourceIdResponseQ.isEmpty()) {
-                        reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
+                        reponse = resourceIdResponseQ.take();
                     }
                 }
             }
             //if no response available or it has an exception, request a new one
             if (reponse == null || reponse.getException() != null) {
                 ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId);
-                ((INCMessageBroker) appCtx.getMessageBroker()).sendMessageToCC(msg, this);
-                reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
+                ((INCMessageBroker) appCtx.getMessageBroker()).sendMessageToCC(msg);
+                reponse = resourceIdResponseQ.take();
                 if (reponse.getException() != null) {
                     throw new HyracksDataException(reponse.getException().getMessage());
                 }
@@ -72,9 +74,4 @@
             throw new HyracksDataException(e);
         }
     }
-
-    @Override
-    public void deliverMessageResponse(IApplicationMessage message) {
-        resourceIdResponseQ.offer(message);
-    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
index d201d60..d9a55ad 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
@@ -411,7 +411,7 @@
                 for (Integer partitionId : request.getPartitions()) {
                     nodePartitions.add(clusterPartitions.get(partitionId));
                 }
-                failedTakeoverRequests.add(request.getId());
+                failedTakeoverRequests.add(request.getRequestId());
             }
         }
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 52e2818..e5f3555 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -29,7 +29,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -59,9 +59,9 @@
             throws HyracksDataException {
 
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IIndexLifecycleManager indexLifeCycleManager =
+        IResourceLifecycleManager indexLifeCycleManager =
                 txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index c6743dd..f9f2b89 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -29,7 +29,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -55,9 +55,9 @@
             int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
             throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+        IResourceLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 8c91c1a..df7d65d 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -29,7 +29,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -53,9 +53,9 @@
             int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
             throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+        IResourceLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 3e11531..39e916b4 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -29,7 +29,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -53,9 +53,9 @@
             int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
             throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+        IResourceLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index 5bf1505..85cdc89 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -28,7 +28,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -55,9 +55,9 @@
             throws HyracksDataException {
 
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
-        IIndexLifecycleManager indexLifeCycleManager =
+        IResourceLifecycleManager indexLifeCycleManager =
                 txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourceName);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
index c3cf3e0..851289e 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -18,21 +18,25 @@
  */
 package org.apache.asterix.transaction.management.service.recovery;
 
+import java.io.IOError;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 
 public class CheckpointThread extends Thread {
 
+    private static final Logger LOGGER = Logger.getLogger(CheckpointThread.class.getName());
     private long lsnThreshold;
     private long checkpointTermInSecs;
 
     private final ILogManager logManager;
     private final IRecoveryManager recoveryMgr;
 
-    public CheckpointThread(IRecoveryManager recoveryMgr, IIndexLifecycleManager indexLifecycleManager, ILogManager logManager,
+    public CheckpointThread(IRecoveryManager recoveryMgr, ILogManager logManager,
             long lsnThreshold, long checkpointTermInSecs) {
         this.recoveryMgr = recoveryMgr;
         this.logManager = logManager;
@@ -45,24 +49,25 @@
 
         Thread.currentThread().setName("Checkpoint Thread");
 
-        long currentCheckpointAttemptMinLSN = -1;
+        long currentCheckpointAttemptMinLSN;
         long lastCheckpointLSN = -1;
-        long currentLogLSN = 0;
-        long targetCheckpointLSN = 0;
+        long currentLogLSN;
+        long targetCheckpointLSN;
         while (true) {
             try {
                 sleep(checkpointTermInSecs * 1000);
             } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                 //ignore
             }
 
-
-            if(lastCheckpointLSN == -1)
-            {
+            if (lastCheckpointLSN == -1) {
                 try {
-                    //Since the system just started up after sharp checkpoint, last checkpoint LSN is considered as the min LSN of the current log partition
+                    //Since the system just started up after sharp checkpoint,
+                    //last checkpoint LSN is considered as the min LSN of the current log partition
                     lastCheckpointLSN = logManager.getReadableSmallestLSN();
                 } catch (Exception e) {
+                    LOGGER.log(Level.WARNING, "Error getting smallest readable LSN", e);
                     lastCheckpointLSN = 0;
                 }
             }
@@ -82,13 +87,12 @@
                     currentCheckpointAttemptMinLSN = recoveryMgr.checkpoint(false, targetCheckpointLSN);
 
                     //checkpoint was completed at target LSN or above
-                    if(currentCheckpointAttemptMinLSN >= targetCheckpointLSN)
-                    {
+                    if (currentCheckpointAttemptMinLSN >= targetCheckpointLSN) {
                         lastCheckpointLSN = currentCheckpointAttemptMinLSN;
                     }
 
                 } catch (ACIDException | HyracksDataException e) {
-                    throw new Error("failed to checkpoint", e);
+                    throw new IOError(e);
                 }
             }
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 0183b29..eb22c22 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -367,7 +367,7 @@
                                 String resourceAbsolutePath =
                                         partitionIODevicePath + File.separator + localResource.getResourceName();
                                 localResource.setResourcePath(resourceAbsolutePath);
-                                index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
+                                index = (ILSMIndex) datasetLifecycleManager.get(resourceAbsolutePath);
                                 if (index == null) {
                                     //#. create index instance and register to indexLifeCycleManager
                                     localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
@@ -568,7 +568,7 @@
     public long getLocalMinFirstLSN() throws HyracksDataException {
         IDatasetLifecycleManager datasetLifecycleManager =
                 txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
-        List<IIndex> openIndexList = datasetLifecycleManager.getOpenIndexes();
+        List<IIndex> openIndexList = datasetLifecycleManager.getOpenResources();
         long firstLSN;
         //the min first lsn can only be the current append or smaller
         long minFirstLSN = logMgr.getAppendLSN();
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index f7ed355..6b25542 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -81,8 +81,7 @@
         this.recoveryManager = new RecoveryManager(this);
 
         if (asterixAppRuntimeContextProvider != null) {
-            this.checkpointThread = new CheckpointThread(recoveryManager,
-                    asterixAppRuntimeContextProvider.getDatasetLifecycleManager(), logManager,
+            this.checkpointThread = new CheckpointThread(recoveryManager, logManager,
                     this.txnProperties.getCheckpointLSNThreshold(), this.txnProperties.getCheckpointPollFrequency());
             this.checkpointThread.start();
         } else {
@@ -95,22 +94,27 @@
         }
     }
 
+    @Override
     public ILogManager getLogManager() {
         return logManager;
     }
 
+    @Override
     public ILockManager getLockManager() {
         return lockManager;
     }
 
+    @Override
     public ITransactionManager getTransactionManager() {
         return transactionManager;
     }
 
+    @Override
     public IRecoveryManager getRecoveryManager() {
         return recoveryManager;
     }
 
+    @Override
     public IAsterixAppRuntimeContextProvider getAsterixAppRuntimeContextProvider() {
         return asterixAppRuntimeContextProvider;
     }
@@ -119,6 +123,7 @@
         return txnProperties;
     }
 
+    @Override
     public String getId() {
         return id;
     }
diff --git a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
index 3a4a56a..887532c 100644
--- a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
+++ b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.transaction.management.service.locking;
 
+import static org.mockito.Mockito.mock;
+
 import java.util.concurrent.Executors;
 
 import org.apache.asterix.common.api.AsterixThreadExecutor;
@@ -31,9 +33,6 @@
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.file.IFileMapProvider;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
-
-import static org.mockito.Mockito.mock;
 
 class TestRuntimeContextProvider implements IAsterixAppRuntimeContextProvider {
 
@@ -86,11 +85,6 @@
     }
 
     @Override
-    public ResourceIdFactory getResourceIdFactory() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
     public IIOManager getIOManager() {
         throw new UnsupportedOperationException();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessage.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessage.java
index ad19cf0..2f5a873 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessage.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessage.java
@@ -20,9 +20,6 @@
 
 import java.io.Serializable;
 
-/**
- * @author rico
- */
 public interface IMessage extends Serializable {
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
index 2b166d2..8aaa563 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
@@ -18,9 +18,6 @@
  */
 package org.apache.hyracks.api.messages;
 
-/**
- * @author rico
- */
 public interface IMessageBroker {
 
     public void receivedMessage(IMessage message, String nodeId) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/IndexLifecycleManagerProvider.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/IndexLifecycleManagerProvider.java
index 5228f80..960a11c 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/IndexLifecycleManagerProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/IndexLifecycleManagerProvider.java
@@ -19,14 +19,14 @@
 package org.apache.hyracks.examples.btree.helper;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 
 public enum IndexLifecycleManagerProvider implements IIndexLifecycleManagerProvider {
     INSTANCE;
 
     @Override
-    public IIndexLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
+    public IResourceLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
         return RuntimeContext.get(ctx).getIndexLifecycleManager();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
index 2e96527..a4909ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
@@ -24,7 +24,7 @@
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
@@ -46,7 +46,7 @@
     private IBufferCache bufferCache;
     private IFileMapManager fileMapManager;
     private ILocalResourceRepository localResourceRepository;
-    private IIndexLifecycleManager lcManager;
+    private IResourceLifecycleManager lcManager;
     private ResourceIdFactory resourceIdFactory;
     private ThreadFactory threadFactory = new ThreadFactory() {
         public Thread newThread(Runnable r) {
@@ -90,7 +90,7 @@
         return resourceIdFactory;
     }
 
-    public IIndexLifecycleManager getIndexLifecycleManager() {
+    public IResourceLifecycleManager getIndexLifecycleManager() {
         return lcManager;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndex.java
index 8ea1f88..be68903 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndex.java
@@ -26,7 +26,7 @@
  * This interface describes the operations common to all indexes. Indexes
  * implementing this interface can easily reuse existing index operators for
  * dataflow. Users must perform operations on an via an {@link IIndexAccessor}.
- * During dataflow, the lifecycle of IIndexes are handled through an {@link IIndexLifecycleManager}.
+ * During dataflow, the lifecycle of IIndexes are handled through an {@link IResourceLifecycleManager}.
  */
 public interface IIndex {
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManager.java
deleted file mode 100644
index 4c00b93..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManager.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.storage.am.common.api;
-
-import java.util.List;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.common.IResourceMemoryManager;
-
-public interface IIndexLifecycleManager extends IResourceMemoryManager {
-    public List<IIndex> getOpenIndexes();
-
-    public void register(String resourcePath, IIndex index) throws HyracksDataException;
-
-    public void open(String resourcePath) throws HyracksDataException;
-
-    public void close(String resourcePath) throws HyracksDataException;
-
-    public IIndex getIndex(String resourcePath) throws HyracksDataException;
-
-    public void unregister(String resourcePath) throws HyracksDataException;
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManagerProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManagerProvider.java
index 127ee83..5378610 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManagerProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManagerProvider.java
@@ -23,5 +23,5 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public interface IIndexLifecycleManagerProvider extends Serializable {
-    public IIndexLifecycleManager getLifecycleManager(IHyracksTaskContext ctx);
+    public IResourceLifecycleManager<IIndex> getLifecycleManager(IHyracksTaskContext ctx);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IResourceLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IResourceLifecycleManager.java
new file mode 100644
index 0000000..1d18a8f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IResourceLifecycleManager.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.common.api;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.IResourceMemoryManager;
+
+/**
+ * The base interface for resource management
+ *
+ * @param <R>
+ *            resource class
+ */
+public interface IResourceLifecycleManager<R> extends IResourceMemoryManager {
+    /**
+     * get a list of all resources which are opened
+     *
+     * @return
+     */
+    public List<R> getOpenResources();
+
+    /**
+     * Registers a resource.
+     *
+     * @param resourceId
+     * @param resource
+     * @throws HyracksDataException
+     *             if a resource is already registered with this resourceId
+     */
+    public void register(String resourceId, R resource) throws HyracksDataException;
+
+    /**
+     * Opens a resource. The resource is moved to the open state
+     *
+     * @param resourceId
+     * @throws HyracksDataException
+     *             if no resource with the resourceId exists
+     */
+    public void open(String resourceId) throws HyracksDataException;
+
+    /**
+     * closes a resource and free up its allocated resources
+     *
+     * @param resourceId
+     * @throws HyracksDataException
+     */
+    public void close(String resourceId) throws HyracksDataException;
+
+    /**
+     * gets the resource registered with this id
+     *
+     * @param resourceId
+     * @return the resource if registered or null
+     * @throws HyracksDataException
+     */
+    public R get(String resourceId) throws HyracksDataException;
+
+    /**
+     * unregister a resource removing its resources in memory and on disk
+     *
+     * @param resourceId
+     * @throws HyracksDataException
+     */
+    public void unregister(String resourceId) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index 2e45c7c..202c0a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -26,7 +26,7 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.frames.LIFOMetaDataFrame;
 import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
 import org.apache.hyracks.storage.common.file.ILocalResourceFactory;
@@ -38,7 +38,7 @@
 
     protected final IIndexOperatorDescriptor opDesc;
     protected final IHyracksTaskContext ctx;
-    protected final IIndexLifecycleManager lcManager;
+    protected final IResourceLifecycleManager<IIndex> lcManager;
     protected final ILocalResourceRepository localResourceRepository;
     protected final IResourceIdFactory resourceIdFactory;
     protected final FileReference file;
@@ -72,8 +72,9 @@
     @Override
     public void create() throws HyracksDataException {
         synchronized (lcManager) {
-            index = lcManager.getIndex(resourcePath);
+            index = lcManager.get(resourcePath);
             if (index != null) {
+                //how is this right?????????? <needs to be fixed>
                 lcManager.unregister(resourcePath);
             } else {
                 index = createIndexInstance();
@@ -109,7 +110,7 @@
                 throw new HyracksDataException("Index does not have a valid resource ID. Has it been created yet?");
             }
 
-            index = lcManager.getIndex(resourcePath);
+            index = lcManager.get(resourcePath);
             if (index == null) {
                 index = createIndexInstance();
                 lcManager.register(resourcePath, index);
@@ -128,7 +129,7 @@
     @Override
     public void destroy() throws HyracksDataException {
         synchronized (lcManager) {
-            index = lcManager.getIndex(resourcePath);
+            index = lcManager.get(resourcePath);
             if (index != null) {
                 lcManager.unregister(resourcePath);
             } else {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index 5953abe..224283d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -30,9 +30,9 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.am.common.api.IIndex;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 
-public class IndexLifecycleManager implements IIndexLifecycleManager, ILifeCycleComponent {
+public class IndexLifecycleManager implements IResourceLifecycleManager<IIndex>, ILifeCycleComponent {
     private static final long DEFAULT_MEMORY_BUDGET = 1024 * 1024 * 100; // 100 megabytes
 
     private final Map<String, IndexInfo> indexInfos;
@@ -131,6 +131,7 @@
             }
         }
 
+        @Override
         public String toString() {
             return "{index: " + index + ", isOpen: " + isOpen + ", refCount: " + referenceCount + ", lastAccess: "
                     + lastAccess + "}";
@@ -138,7 +139,7 @@
     }
 
     @Override
-    public List<IIndex> getOpenIndexes() {
+    public List<IIndex> getOpenResources() {
         List<IIndex> openIndexes = new ArrayList<IIndex>();
         for (IndexInfo i : indexInfos.values()) {
             if (i.isOpen) {
@@ -165,6 +166,7 @@
         }
     }
 
+    @Override
     public void dumpState(OutputStream os) throws IOException {
         StringBuilder sb = new StringBuilder();
 
@@ -212,7 +214,7 @@
     }
 
     @Override
-    public IIndex getIndex(String resourcePath) throws HyracksDataException {
+    public IIndex get(String resourcePath) throws HyracksDataException {
         IndexInfo info = indexInfos.get(resourcePath);
         return info == null ? null : info.index;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeDataflowHelper.java
index ab5be15..ef8f646 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeDataflowHelper.java
@@ -59,7 +59,7 @@
         synchronized (lcManager) {
             if (index == null) {
                 try {
-                    index = lcManager.getIndex(resourcePath);
+                    index = lcManager.get(resourcePath);
                 } catch (HyracksDataException e) {
                     return null;
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyDataflowHelper.java
index 6e0beb3..e7a1161 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyDataflowHelper.java
@@ -60,7 +60,7 @@
         synchronized (lcManager) {
             if (index == null) {
                 try {
-                    index = lcManager.getIndex(resourcePath);
+                    index = lcManager.get(resourcePath);
                 } catch (HyracksDataException e) {
                     return null;
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
index d2d86e6..c047777 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
@@ -23,7 +23,7 @@
 import java.util.Set;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 
 public interface ILSMMergePolicyFactory extends Serializable {
     // Having two methods to create the merge policy with two different signatures is hacky, but we do it now
@@ -32,7 +32,7 @@
     // in asterix and cannot be seen in hyracks. Thus we pass IHyracksTaskContext and let the merge policy do the casting.
     public ILSMMergePolicy createMergePolicy(Map<String, String> configuration, IHyracksTaskContext ctx);
 
-    public ILSMMergePolicy createMergePolicy(Map<String, String> configuration, IIndexLifecycleManager ilcm);
+    public ILSMMergePolicy createMergePolicy(Map<String, String> configuration, IResourceLifecycleManager ilcm);
 
     public String getName();
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java
index 08672a5..e75aa7f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java
@@ -24,7 +24,7 @@
 import java.util.Set;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
@@ -53,7 +53,7 @@
     }
 
     @Override
-    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IIndexLifecycleManager ilcm) {
+    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IResourceLifecycleManager ilcm) {
         ILSMMergePolicy policy = new ConstantMergePolicy();
         policy.configure(properties);
         return policy;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
index b3b35f4..052e9af 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
@@ -25,7 +25,7 @@
 import java.util.Set;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
@@ -54,7 +54,7 @@
     }
 
     @Override
-    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IIndexLifecycleManager ilcm) {
+    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IResourceLifecycleManager ilcm) {
         ILSMMergePolicy policy = new NoMergePolicy();
         policy.configure(properties);
         return policy;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java
index 465a5f4..e1e05f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java
@@ -25,7 +25,7 @@
 import java.util.Set;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
@@ -55,7 +55,7 @@
     }
 
     @Override
-    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IIndexLifecycleManager ilcm) {
+    public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IResourceLifecycleManager ilcm) {
         ILSMMergePolicy policy = new PrefixMergePolicy();
         policy.configure(properties);
         return policy;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeDataflowHelper.java
index 3503df5..9731371 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeDataflowHelper.java
@@ -77,7 +77,7 @@
         synchronized (lcManager) {
             if (index == null) {
                 try {
-                    index = lcManager.getIndex(resourcePath);
+                    index = lcManager.get(resourcePath);
                 } catch (HyracksDataException e) {
                     return null;
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestIndexLifecycleManagerProvider.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestIndexLifecycleManagerProvider.java
index f48778e..409b49b 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestIndexLifecycleManagerProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestIndexLifecycleManagerProvider.java
@@ -19,16 +19,17 @@
 package org.apache.hyracks.test.support;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 
 public class TestIndexLifecycleManagerProvider implements IIndexLifecycleManagerProvider {
 
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IIndexLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
-        return TestStorageManagerComponentHolder.getIndexLifecycleManager(ctx);
+    public IResourceLifecycleManager<IIndex> getLifecycleManager(IHyracksTaskContext ctx) {
+        return TestStorageManagerComponentHolder.getIndexLifecycleManager();
     }
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
index 482dccd..ee5f320 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -28,7 +28,8 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.control.nc.io.IOManager;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IIndex;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
 import org.apache.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
@@ -51,13 +52,14 @@
     private static IFileMapProvider fileMapProvider;
     private static IOManager ioManager;
     private static ILocalResourceRepository localResourceRepository;
-    private static IIndexLifecycleManager lcManager;
+    private static IResourceLifecycleManager<IIndex> lcManager;
     private static ResourceIdFactory resourceIdFactory;
 
     private static int pageSize;
     private static int numPages;
     private static int maxOpenFiles;
     private final static ThreadFactory threadFactory = new ThreadFactory() {
+        @Override
         public Thread newThread(Runnable r) {
             return new Thread(r);
         }
@@ -73,7 +75,7 @@
         lcManager = null;
     }
 
-    public synchronized static IIndexLifecycleManager getIndexLifecycleManager(IHyracksTaskContext ctx) {
+    public synchronized static IResourceLifecycleManager<IIndex> getIndexLifecycleManager() {
         if (lcManager == null) {
             lcManager = new IndexLifecycleManager();
         }
@@ -110,7 +112,8 @@
     public synchronized static ILocalResourceRepository getLocalResourceRepository(IHyracksTaskContext ctx) {
         if (localResourceRepository == null) {
             try {
-                ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory();
+                ILocalResourceRepositoryFactory localResourceRepositoryFactory =
+                        new TransientLocalResourceRepositoryFactory();
                 localResourceRepository = localResourceRepositoryFactory.createRepository();
             } catch (HyracksException e) {
                 //In order not to change the IStorageManagerInterface due to the test code, throw runtime exception.