ASTERIXDB-1058: make Asterix compatible with lazy LSM memory allocation

- Adapt memory budget calculation to lazy LSM memory allocation.
- Add IDatasetLifecycleManager interface.

Change-Id: I4ea1eb129fe3043d43b077473dc29d17a97dfcc2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/408
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 2a15384..2e7c23f 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -24,6 +24,7 @@
 
 import org.apache.asterix.common.api.AsterixThreadExecutor;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.AsterixCompilerProperties;
 import org.apache.asterix.common.config.AsterixExternalProperties;
 import org.apache.asterix.common.config.AsterixFeedProperties;
@@ -42,16 +43,13 @@
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.feeds.FeedManager;
 import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
-import org.apache.asterix.transaction.management.service.logging.LogManager;
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -99,7 +97,7 @@
     private AsterixBuildProperties buildProperties;
 
     private AsterixThreadExecutor threadExecutor;
-    private DatasetLifecycleManager indexLifecycleManager;
+    private IDatasetLifecycleManager datasetLifecycleManager;
     private IFileMapManager fileMapManager;
     private IBufferCache bufferCache;
     private ITransactionSubsystem txnSubsystem;
@@ -112,7 +110,7 @@
 
     private IFeedManager feedManager;
 
-    public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext) throws AsterixException {
+    public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext) {
         this.ncApplicationContext = ncApplicationContext;
         compilerProperties = new AsterixCompilerProperties(ASTERIX_PROPERTIES_ACCESSOR);
         externalProperties = new AsterixExternalProperties(ASTERIX_PROPERTIES_ACCESSOR);
@@ -143,8 +141,7 @@
 
         ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
                 ioManager, ncApplicationContext.getNodeId());
-        localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory
-                .createRepository();
+        localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository();
         resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
 
         IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProdiverForRecovery(
@@ -152,8 +149,8 @@
         txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider,
                 txnProperties);
 
-        indexLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
-                MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID, (LogManager) txnSubsystem.getLogManager());
+        datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
+                MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager());
 
         isShuttingdown = false;
 
@@ -165,7 +162,7 @@
         lccm.register((ILifeCycleComponent) bufferCache);
         lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
         lccm.register((ILifeCycleComponent) txnSubsystem.getLogManager());
-        lccm.register((ILifeCycleComponent) indexLifecycleManager);
+        lccm.register((ILifeCycleComponent) datasetLifecycleManager);
         lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
         lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
     }
@@ -193,8 +190,8 @@
         return txnSubsystem;
     }
 
-    public IIndexLifecycleManager getIndexLifecycleManager() {
-        return indexLifecycleManager;
+    public IDatasetLifecycleManager getDatasetLifecycleManager() {
+        return datasetLifecycleManager;
     }
 
     public double getBloomFilterFalsePositiveRate() {
@@ -258,12 +255,12 @@
 
     @Override
     public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
-        return indexLifecycleManager.getVirtualBufferCaches(datasetID);
+        return datasetLifecycleManager.getVirtualBufferCaches(datasetID);
     }
 
     @Override
     public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
-        return indexLifecycleManager.getOperationTracker(datasetID);
+        return datasetLifecycleManager.getOperationTracker(datasetID);
     }
 
     @Override
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
index 1d504dd..570c3c9 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
@@ -22,10 +22,10 @@
 
 import org.apache.asterix.common.api.AsterixThreadExecutor;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
@@ -58,8 +58,8 @@
     }
 
     @Override
-    public IIndexLifecycleManager getIndexLifecycleManager() {
-        return asterixAppRuntimeContext.getIndexLifecycleManager();
+    public IDatasetLifecycleManager getDatasetLifecycleManager() {
+        return asterixAppRuntimeContext.getDatasetLifecycleManager();
     }
 
     @Override
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 3c5549f..f86ed8a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -28,7 +28,6 @@
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -60,7 +59,7 @@
 
     public ILocalResourceRepository getLocalResourceRepository();
 
-    public IIndexLifecycleManager getIndexLifecycleManager();
+    public IDatasetLifecycleManager getDatasetLifecycleManager();
 
     public ResourceIdFactory getResourceIdFactory();
 
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixRuntimeComponentsProvider.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixRuntimeComponentsProvider.java
index f978a8a..1e4869e 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixRuntimeComponentsProvider.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixRuntimeComponentsProvider.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.common.api;
 
 import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
@@ -35,7 +34,7 @@
 
     public IFileMapProvider getFileMapManager();
 
-    public IIndexLifecycleManager getIndexLifecycleManager();
+    public IDatasetLifecycleManager getDatasetLifecycleManager();
 
     public double getBloomFilterFalsePositiveRate();
 
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
new file mode 100644
index 0000000..86e6db5
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import java.util.List;
+
+import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IIndex;
+import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+
+public interface IDatasetLifecycleManager extends IIndexLifecycleManager {
+    /**
+     * @param datasetID
+     * @param resourceID
+     * @return The corresponding index, or null if it is not found in the registered indexes.
+     * @throws HyracksDataException
+     */
+    IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException;
+
+    /**
+     * Allocates the memory budget of the dataset in the virtual buffer cache.
+     * @param datasetID
+     * @throws HyracksDataException
+     */
+    void allocateDatasetMemory(int datasetID) throws HyracksDataException;
+
+    /**
+     * Flushes all open datasets synchronously.
+     * @throws HyracksDataException
+     */
+    void flushAllDatasets() throws HyracksDataException;
+
+    /**
+     * Schedules asynchronous flush on datasets that have memory components with first LSN < nonSharpCheckpointTargetLSN.
+     * @param nonSharpCheckpointTargetLSN
+     * @throws HyracksDataException
+     */
+    void scheduleAsyncFlushForLaggingDatasets(long nonSharpCheckpointTargetLSN) throws HyracksDataException;
+
+    /**
+     * creates (if necessary) and returns the dataset info.
+     * @param datasetID
+     * @return
+     */
+    DatasetInfo getDatasetInfo(int datasetID);
+
+    /**
+     * @param datasetId
+     *            the dataset id to be flushed.
+     * @param asyncFlush
+     *            a flag indicating whether to wait for the flush to complete or not.
+     * @throws HyracksDataException
+     */
+    void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException;
+
+    /**
+     * creates (if necessary) and returns the primary index operation tracker of a dataset.
+     * @param datasetID
+     * @return
+     */
+    ILSMOperationTracker getOperationTracker(int datasetID);
+
+    /**
+     * creates (if necessary) and returns the dataset virtual buffer caches.
+     * @param datasetID
+     * @return
+     */
+    List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 3b824ec..d78e2cb 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.context;
 
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -28,11 +29,11 @@
 
 public class BaseOperationTracker implements ILSMOperationTracker {
 
-    protected final DatasetLifecycleManager datasetLifecycleManager;
+    protected final IDatasetLifecycleManager datasetLifecycleManager;
     protected final int datasetID;
-    protected DatasetInfo dsInfo;
-    
-    public BaseOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID, DatasetInfo dsInfo) {
+    protected final DatasetInfo dsInfo;
+
+    public BaseOperationTracker(IDatasetLifecycleManager datasetLifecycleManager, int datasetID, DatasetInfo dsInfo) {
         this.datasetLifecycleManager = datasetLifecycleManager;
         this.datasetID = datasetID;
         this.dsInfo = dsInfo;
@@ -59,10 +60,6 @@
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
     }
 
-    public void setDatasetInfo(DatasetInfo dsInfo){
-        this.dsInfo = dsInfo;
-    }
-    
     public void exclusiveJobCommitted() throws HyracksDataException {
     }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index 567f75e..a7374d3 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -25,7 +25,7 @@
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IndexException;
@@ -42,11 +42,11 @@
     private long maxMergableComponentSize;
     private int maxToleranceComponentCount;
 
-    private final DatasetLifecycleManager datasetLifecycleManager;
+    private final IDatasetLifecycleManager datasetLifecycleManager;
     private final int datasetID;
-    
+
     public CorrelatedPrefixMergePolicy(IIndexLifecycleManager datasetLifecycleManager, int datasetID) {
-        this.datasetLifecycleManager = (DatasetLifecycleManager) datasetLifecycleManager;
+        this.datasetLifecycleManager = (DatasetLifecycleManager)datasetLifecycleManager;
         this.datasetID = datasetID;
     }
 
@@ -70,7 +70,7 @@
             }
         }
         if (fullMergeIsRequested) {
-            ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+            ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
             accessor.scheduleFullMerge(index.getIOOperationCallback());
             return;
@@ -113,7 +113,7 @@
                     // Reverse the components order back to its original order
                     Collections.reverse(mergableComponents);
 
-                    ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
+                    ILSMIndexAccessor accessor = lsmIndex.createAccessor(
                             NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
                     accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents);
                 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java b/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
index 385bbbe..ce405fc 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
@@ -25,6 +25,7 @@
 import java.util.Set;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
@@ -42,8 +43,8 @@
 
     @Override
     public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IHyracksTaskContext ctx) {
-        DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx
-                .getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager();
+        IDatasetLifecycleManager dslcManager = ((IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject()).getDatasetLifecycleManager();
         ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(dslcManager, datasetID);
         policy.configure(properties);
         return policy;
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 741e106..b2e7b69 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -28,6 +28,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
 import org.apache.asterix.common.config.AsterixStorageProperties;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -37,7 +38,6 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.am.common.api.IIndex;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
@@ -50,7 +50,7 @@
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.file.LocalResource;
 
-public class DatasetLifecycleManager implements IIndexLifecycleManager, ILifeCycleComponent {
+public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent {
     private final AsterixStorageProperties storageProperties;
     private final Map<Integer, List<IVirtualBufferCache>> datasetVirtualBufferCaches;
     private final Map<Integer, ILSMOperationTracker> datasetOpTrackers;
@@ -108,6 +108,7 @@
             dsInfo.isExternal = !index.hasMemoryComponents();
             dsInfo.isRegistered = true;
         }
+
         if (dsInfo.indexes.containsKey(resourceID)) {
             throw new HyracksDataException("Index with resource ID " + resourceID + " already exists.");
         }
@@ -193,21 +194,8 @@
             throw new HyracksDataException("Failed to open index with resource ID " + resourceID
                     + " since it does not exist.");
         }
-
-        // This is not needed for external datasets' indexes since they never use the virtual buffer cache.
         if (!dsInfo.isOpen && !dsInfo.isExternal) {
-            List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did);
-            assert vbcs != null;
-            long additionalSize = 0;
-            for (IVirtualBufferCache vbc : vbcs) {
-                additionalSize += vbc.getNumPages() * vbc.getPageSize();
-            }
-            while (used + additionalSize > capacity) {
-                if (!evictCandidateDataset()) {
-                    throw new HyracksDataException("Cannot activate index since memory budget would be exceeded.");
-                }
-            }
-            used += additionalSize;
+            initializeDatasetVirtualBufferCache(did);
         }
 
         dsInfo.isOpen = true;
@@ -226,7 +214,6 @@
         // We will take a dataset that has no active transactions, it is open (a dataset consuming memory), 
         // that is not being used (refcount == 0) and has been least recently used. The sort order defined 
         // for DatasetInfo maintains this. See DatasetInfo.compareTo().
-
         List<DatasetInfo> datasetInfosList = new ArrayList<DatasetInfo>(datasetInfos.values());
         Collections.sort(datasetInfosList);
         for (DatasetInfo dsInfo : datasetInfosList) {
@@ -241,7 +228,7 @@
         return false;
     }
 
-    private void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException {
+    private static void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException {
         if (iInfo.isOpen) {
             ILSMIndexAccessor accessor = iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
@@ -261,6 +248,7 @@
         }
     }
 
+    @Override
     public DatasetInfo getDatasetInfo(int datasetID) {
         synchronized (datasetInfos) {
             DatasetInfo dsInfo = datasetInfos.get(datasetID);
@@ -306,32 +294,39 @@
         synchronized (datasetVirtualBufferCaches) {
             List<IVirtualBufferCache> vbcs = datasetVirtualBufferCaches.get(datasetID);
             if (vbcs == null) {
-                vbcs = new ArrayList<IVirtualBufferCache>();
-                int numPages = datasetID < firstAvilableUserDatasetID ? storageProperties
-                        .getMetadataMemoryComponentNumPages() : storageProperties.getMemoryComponentNumPages();
-                for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
-                    MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(new VirtualBufferCache(
-                            new HeapBufferAllocator(), storageProperties.getMemoryComponentPageSize(), numPages
-                                    / storageProperties.getMemoryComponentsNum()));
-                    vbcs.add(vbc);
+                initializeDatasetVirtualBufferCache(datasetID);
+                vbcs = datasetVirtualBufferCaches.get(datasetID);
+                if (vbcs == null) {
+                    throw new IllegalStateException("Could not find dataset " + datasetID + " virtual buffer cache.");
                 }
-                datasetVirtualBufferCaches.put(datasetID, vbcs);
             }
             return vbcs;
         }
     }
 
-    private void removeDatasetFromCache(int datasetID) {
-        List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(datasetID);
-        assert vbcs != null;
-        for (IVirtualBufferCache vbc : vbcs) {
-            used -= (vbc.getNumPages() * vbc.getPageSize());
-        }
+    private void removeDatasetFromCache(int datasetID) throws HyracksDataException {
+        deallocateDatasetMemory(datasetID);
         datasetInfos.remove(datasetID);
         datasetVirtualBufferCaches.remove(datasetID);
         datasetOpTrackers.remove(datasetID);
     }
 
+    private void initializeDatasetVirtualBufferCache(int datasetID) {
+        List<IVirtualBufferCache> vbcs = new ArrayList<IVirtualBufferCache>();
+        synchronized (datasetVirtualBufferCaches) {
+            int numPages = datasetID < firstAvilableUserDatasetID ? storageProperties
+                    .getMetadataMemoryComponentNumPages() : storageProperties.getMemoryComponentNumPages();
+            for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
+                MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(new VirtualBufferCache(
+                        new HeapBufferAllocator(), storageProperties.getMemoryComponentPageSize(), numPages
+                                / storageProperties.getMemoryComponentsNum()));
+                vbcs.add(vbc);
+            }
+            datasetVirtualBufferCaches.put(datasetID, vbcs);
+        }
+    }
+
+    @Override
     public ILSMOperationTracker getOperationTracker(int datasetID) {
         synchronized (datasetOpTrackers) {
             ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
@@ -376,12 +371,14 @@
         private int numActiveIOOps;
         private boolean isExternal;
         private boolean isRegistered;
+        private boolean memoryAllocated;
 
         public DatasetInfo(int datasetID) {
             this.indexes = new HashMap<Long, IndexInfo>();
             this.lastAccess = -1;
             this.datasetID = datasetID;
             this.isRegistered = false;
+            this.memoryAllocated = false;
         }
 
         @Override
@@ -406,7 +403,7 @@
             notifyAll();
         }
 
-        public synchronized Set<ILSMIndex> getDatasetIndexes() throws HyracksDataException {
+        public synchronized Set<ILSMIndex> getDatasetIndexes() {
             Set<ILSMIndex> datasetIndexes = new HashSet<ILSMIndex>();
             for (IndexInfo iInfo : indexes.values()) {
                 if (iInfo.isOpen) {
@@ -454,7 +451,16 @@
         @Override
         public String toString() {
             return "DatasetID: " + datasetID + ", isOpen: " + isOpen + ", refCount: " + referenceCount
-                    + ", lastAccess: " + lastAccess + "}";
+                    + ", lastAccess: " + lastAccess + ", isRegistered: " + isRegistered + ", memoryAllocated: "
+                    + memoryAllocated;
+        }
+
+        public boolean isMemoryAllocated() {
+            return memoryAllocated;
+        }
+
+        public int getDatasetID() {
+            return datasetID;
         }
     }
 
@@ -463,12 +469,14 @@
         used = 0;
     }
 
+    @Override
     public synchronized void flushAllDatasets() throws HyracksDataException {
         for (DatasetInfo dsInfo : datasetInfos.values()) {
             flushDatasetOpenIndexes(dsInfo, false);
         }
     }
 
+    @Override
     public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
         DatasetInfo datasetInfo = datasetInfos.get(datasetId);
         if (datasetInfo != null) {
@@ -476,6 +484,7 @@
         }
     }
 
+    @Override
     public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException {
         //schedule flush for datasets with min LSN (Log Serial Number) < targetLSN
         for (DatasetInfo dsInfo : datasetInfos.values()) {
@@ -537,7 +546,6 @@
                 accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
             }
         } else {
-
             for (IndexInfo iInfo : dsInfo.indexes.values()) {
                 // TODO: This is not efficient since we flush the indexes sequentially. 
                 // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this
@@ -557,14 +565,12 @@
                     throw new HyracksDataException(e);
                 }
             }
-
         }
         try {
             flushDatasetOpenIndexes(dsInfo, false);
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
-
         for (IndexInfo iInfo : dsInfo.indexes.values()) {
             if (iInfo.isOpen) {
                 ILSMOperationTracker opTracker = iInfo.index.getOperationTracker();
@@ -576,7 +582,6 @@
             assert iInfo.referenceCount == 0;
         }
         dsInfo.isOpen = false;
-
         removeDatasetFromCache(dsInfo.datasetID);
     }
 
@@ -628,4 +633,48 @@
 
         outputStream.write(sb.toString().getBytes());
     }
-}
+
+    @Override
+    public synchronized void allocateDatasetMemory(int datasetId) throws HyracksDataException {
+        DatasetInfo dsInfo = datasetInfos.get(datasetId);
+        if (dsInfo == null) {
+            throw new HyracksDataException("Failed to allocate memory for dataset with ID " + datasetId
+                    + " since it is not open.");
+        }
+        synchronized (dsInfo) {
+            // This is not needed for external datasets' indexes since they never use the virtual buffer cache.
+            if (!dsInfo.memoryAllocated && !dsInfo.isExternal) {
+                List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID);
+                long additionalSize = 0;
+                for (IVirtualBufferCache vbc : vbcs) {
+                    additionalSize += vbc.getNumPages() * vbc.getPageSize();
+                }
+                while (used + additionalSize > capacity) {
+                    if (!evictCandidateDataset()) {
+                        throw new HyracksDataException("Cannot allocate dataset " + dsInfo.datasetID
+                                + " memory since memory budget would be exceeded.");
+                    }
+                }
+                used += additionalSize;
+                dsInfo.memoryAllocated = true;
+            }
+        }
+    }
+
+    private synchronized void deallocateDatasetMemory(int datasetId) throws HyracksDataException {
+        DatasetInfo dsInfo = datasetInfos.get(datasetId);
+        if (dsInfo == null) {
+            throw new HyracksDataException("Failed to deallocate memory for dataset with ID " + datasetId
+                    + " since it is not open.");
+        }
+        synchronized (dsInfo) {
+            if (dsInfo.isOpen && dsInfo.memoryAllocated) {
+                List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID);
+                for (IVirtualBufferCache vbc : vbcs) {
+                    used -= (vbc.getNumPages() * vbc.getPageSize());
+                }
+                dsInfo.memoryAllocated = false;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 6c63248..c3d75b1 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -22,6 +22,7 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -48,7 +49,7 @@
     private boolean flushOnExit = false;
     private boolean flushLogCreated = false;
 
-    public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID,
+    public PrimaryIndexOperationTracker(IDatasetLifecycleManager datasetLifecycleManager, int datasetID,
             ILogManager logManager, DatasetInfo dsInfo) {
         super(datasetLifecycleManager, datasetID, dsInfo);
         this.logManager = logManager;
@@ -59,6 +60,9 @@
     public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
+            if (!dsInfo.isMemoryAllocated()) {
+                datasetLifecycleManager.allocateDatasetMemory(dsInfo.getDatasetID());
+            }
             incrementNumActiveOperations(modificationCallback);
         } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
             dsInfo.declareActiveIOOperation();
@@ -77,7 +81,7 @@
     @Override
     public synchronized void completeOperation(ILSMIndex index, LSMOperationType opType,
             ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
             decrementNumActiveOperations(modificationCallback);
             if (numActiveOperations.get() == 0) {
@@ -108,7 +112,7 @@
         }
 
         if (needsFlush || flushOnExit) {
-            //Make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering them until the current flush is schedule.
+            //Make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering them until the current flush is scheduled.
             for (ILSMIndex lsmIndex : indexes) {
                 AbstractLSMIndex abstractLSMIndex = ((AbstractLSMIndex) lsmIndex);
                 ILSMOperationTracker opTracker = abstractLSMIndex.getOperationTracker();
@@ -133,7 +137,7 @@
         }
     }
 
-    //Since this method is called sequentially by LogPage.notifyFlushTerminator in the sequence flush were scheduled.
+    //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled.
     public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
         for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
 
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index c965dd2..d308564 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -22,8 +22,8 @@
 
 import org.apache.asterix.common.api.AsterixThreadExecutor;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
@@ -42,7 +42,7 @@
 
     public ITransactionSubsystem getTransactionSubsystem();
 
-    public IIndexLifecycleManager getIndexLifecycleManager();
+    public IDatasetLifecycleManager getDatasetLifecycleManager();
 
     public double getBloomFilterFalsePositiveRate();
 
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestsUtils.java b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestsUtils.java
index 69ea1f1..f030e18 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestsUtils.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestsUtils.java
@@ -212,12 +212,19 @@
             // In future this may be changed depending on the requested
             // output format sent to the servlet.
             String errorBody = method.getResponseBodyAsString();
-            JSONObject result = new JSONObject(errorBody);
-            String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
-                    result.getString("stacktrace") };
-            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
-            throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine()
-                    + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]);
+            JSONObject result = null;
+            try {
+                result = new JSONObject(errorBody);
+            } catch (Exception e) {
+                throw new Exception(errorBody);
+            }
+            if (result != null) {
+                String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
+                        result.getString("stacktrace") };
+                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
+                throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine()
+                        + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]);
+            }
         }
         return statusCode;
     }
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 698a54f..97260a1 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -85,8 +85,8 @@
  */
 public class MetadataManager implements IMetadataManager {
     private static final int INITIAL_SLEEP_TIME = 64;
-    private static final int RETRY_MULTIPLIER = 4;
-    private static final int MAX_RETRY_COUNT = 6;
+    private static final int RETRY_MULTIPLIER = 5;
+    private static final int MAX_RETRY_COUNT = 10;
 
     // Set in init().
     public static MetadataManager INSTANCE;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index d600f57..07675fe 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -24,6 +24,7 @@
 import java.util.List;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
@@ -110,7 +111,7 @@
 
     private static final DatasetId METADATA_DATASET_ID = new DatasetId(MetadataPrimaryIndexes.METADATA_DATASET_ID);
 
-    private IIndexLifecycleManager indexLifecycleManager;
+    private IDatasetLifecycleManager datasetLifecycleManager;
     private ITransactionSubsystem transactionSubsystem;
 
     public static final MetadataNode INSTANCE = new MetadataNode();
@@ -121,7 +122,7 @@
 
     public void initialize(IAsterixAppRuntimeContext runtimeContext) {
         this.transactionSubsystem = runtimeContext.getTransactionSubsystem();
-        this.indexLifecycleManager = runtimeContext.getIndexLifecycleManager();
+        this.datasetLifecycleManager = runtimeContext.getDatasetLifecycleManager();
     }
 
     @Override
@@ -285,9 +286,9 @@
             throws Exception {
         long resourceID = metadataIndex.getResourceID();
         String resourceName = metadataIndex.getFile().toString();
-        ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceName);
+        ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.getIndex(resourceName);
         try {
-            indexLifecycleManager.open(resourceName);
+            datasetLifecycleManager.open(resourceName);
 
             // prepare a Callback for logging
             IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID,
@@ -308,7 +309,7 @@
         } catch (Exception e) {
             throw e;
         } finally {
-            indexLifecycleManager.close(resourceName);
+            datasetLifecycleManager.close(resourceName);
         }
     }
 
@@ -635,9 +636,9 @@
             throws Exception {
         long resourceID = metadataIndex.getResourceID();
         String resourceName = metadataIndex.getFile().toString();
-        ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceName);
+        ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.getIndex(resourceName);
         try {
-            indexLifecycleManager.open(resourceName);
+            datasetLifecycleManager.open(resourceName);
             // prepare a Callback for logging
             IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID,
                     metadataIndex, lsmIndex, IndexOperation.DELETE);
@@ -655,7 +656,7 @@
         } catch (Exception e) {
             throw e;
         } finally {
-            indexLifecycleManager.close(resourceName);
+            datasetLifecycleManager.close(resourceName);
         }
     }
 
@@ -969,8 +970,8 @@
         try {
             IMetadataIndex index = MetadataPrimaryIndexes.DATAVERSE_DATASET;
             String resourceName = index.getFile().toString();
-            IIndex indexInstance = indexLifecycleManager.getIndex(resourceName);
-            indexLifecycleManager.open(resourceName);
+            IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
+            datasetLifecycleManager.open(resourceName);
             IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
             ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
@@ -988,11 +989,11 @@
             } finally {
                 rangeCursor.close();
             }
-            indexLifecycleManager.close(resourceName);
+            datasetLifecycleManager.close(resourceName);
 
             index = MetadataPrimaryIndexes.DATASET_DATASET;
-            indexInstance = indexLifecycleManager.getIndex(resourceName);
-            indexLifecycleManager.open(resourceName);
+            indexInstance = datasetLifecycleManager.getIndex(resourceName);
+            datasetLifecycleManager.open(resourceName);
             indexAccessor = indexInstance
                     .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
@@ -1010,11 +1011,11 @@
             } finally {
                 rangeCursor.close();
             }
-            indexLifecycleManager.close(resourceName);
+            datasetLifecycleManager.close(resourceName);
 
             index = MetadataPrimaryIndexes.INDEX_DATASET;
-            indexInstance = indexLifecycleManager.getIndex(resourceName);
-            indexLifecycleManager.open(resourceName);
+            indexInstance = datasetLifecycleManager.getIndex(resourceName);
+            datasetLifecycleManager.open(resourceName);
             indexAccessor = indexInstance
                     .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
@@ -1033,7 +1034,7 @@
             } finally {
                 rangeCursor.close();
             }
-            indexLifecycleManager.close(resourceName);
+            datasetLifecycleManager.close(resourceName);
         } catch (Exception e) {
             e.printStackTrace();
         }
@@ -1044,8 +1045,8 @@
             IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception {
         IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
         String resourceName = index.getFile().toString();
-        IIndex indexInstance = indexLifecycleManager.getIndex(resourceName);
-        indexLifecycleManager.open(resourceName);
+        IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
+        datasetLifecycleManager.open(resourceName);
         IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
                 NoOpOperationCallback.INSTANCE);
         ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
@@ -1074,7 +1075,7 @@
         } finally {
             rangeCursor.close();
         }
-        indexLifecycleManager.close(resourceName);
+        datasetLifecycleManager.close(resourceName);
     }
 
     @Override
@@ -1082,8 +1083,8 @@
         int mostRecentDatasetId = MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID;
         try {
             String resourceName = MetadataPrimaryIndexes.DATASET_DATASET.getFile().toString();
-            IIndex indexInstance = indexLifecycleManager.getIndex(resourceName);
-            indexLifecycleManager.open(resourceName);
+            IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
+            datasetLifecycleManager.open(resourceName);
             try {
                 IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
                         NoOpOperationCallback.INSTANCE);
@@ -1110,7 +1111,7 @@
                     rangeCursor.close();
                 }
             } finally {
-                indexLifecycleManager.close(resourceName);
+                datasetLifecycleManager.close(resourceName);
             }
 
         } catch (Exception e) {
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index c94133a..0605b47 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -31,6 +31,7 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
@@ -38,7 +39,6 @@
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import org.apache.asterix.metadata.IDatasetDetails;
@@ -76,7 +76,6 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
 import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
@@ -107,7 +106,7 @@
 
     private static IBufferCache bufferCache;
     private static IFileMapProvider fileMapProvider;
-    private static IIndexLifecycleManager indexLifecycleManager;
+    private static IDatasetLifecycleManager dataLifecycleManager;
     private static ILocalResourceRepository localResourceRepository;
     private static IIOManager ioManager;
 
@@ -155,7 +154,7 @@
         nodeNames = metadataProperties.getNodeNames();
         // nodeStores = asterixProperity.getStores();
 
-        indexLifecycleManager = runtimeContext.getIndexLifecycleManager();
+        dataLifecycleManager = runtimeContext.getDatasetLifecycleManager();
         localResourceRepository = runtimeContext.getLocalResourceRepository();
         bufferCache = runtimeContext.getBufferCache();
         fileMapProvider = runtimeContext.getFileMapManager();
@@ -391,8 +390,8 @@
         LSMBTree lsmBtree = null;
         long resourceID = -1;
         ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index
-                .getDatasetId().getId()) : new BaseOperationTracker((DatasetLifecycleManager) indexLifecycleManager,
-                index.getDatasetId().getId(), ((DatasetLifecycleManager) indexLifecycleManager).getDatasetInfo(index
+                .getDatasetId().getId()) : new BaseOperationTracker(dataLifecycleManager,
+                index.getDatasetId().getId(), dataLifecycleManager.getDatasetInfo(index
                         .getDatasetId().getId()));
         final String path = file.getFile().getPath();
         if (create) {
@@ -406,7 +405,7 @@
                     bloomFilterKeyFields,
                     runtimeContext.getBloomFilterFalsePositiveRate(),
                     runtimeContext.getMetadataMergePolicyFactory().createMergePolicy(
-                            GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, indexLifecycleManager), opTracker,
+                            GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, dataLifecycleManager), opTracker,
                     runtimeContext.getLSMIOScheduler(),
                     LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), index.isPrimaryIndex(),
                     null, null, null, null, true);
@@ -420,11 +419,11 @@
                     localResourceMetadata, LocalResource.LSMBTreeResource);
             ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory();
             localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, path, 0));
-            indexLifecycleManager.register(path, lsmBtree);
+            dataLifecycleManager.register(path, lsmBtree);
         } else {
             final LocalResource resource = localResourceRepository.getResourceByName(path);
             resourceID = resource.getResourceId();
-            lsmBtree = (LSMBTree) indexLifecycleManager.getIndex(resource.getResourceName());
+            lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(resource.getResourceName());
             if (lsmBtree == null) {
                 lsmBtree = LSMBTreeUtils.createLSMTree(
                         virtualBufferCaches,
@@ -436,10 +435,10 @@
                         bloomFilterKeyFields,
                         runtimeContext.getBloomFilterFalsePositiveRate(),
                         runtimeContext.getMetadataMergePolicyFactory().createMergePolicy(
-                                GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, indexLifecycleManager), opTracker,
+                                GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, dataLifecycleManager), opTracker,
                         runtimeContext.getLSMIOScheduler(), LSMBTreeIOOperationCallbackFactory.INSTANCE
                                 .createIOOperationCallback(), index.isPrimaryIndex(), null, null, null, null, true);
-                indexLifecycleManager.register(path, lsmBtree);
+                dataLifecycleManager.register(path, lsmBtree);
             }
         }
 
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
index b4017fa..122a8ff 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -21,7 +21,7 @@
 import java.nio.ByteBuffer;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ILockManager;
@@ -73,8 +73,7 @@
                 try {
                     IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                             .getApplicationContext().getApplicationObject();
-                    DatasetLifecycleManager datasetLifeCycleManager = (DatasetLifecycleManager) runtimeCtx
-                            .getIndexLifecycleManager();
+                    IDatasetLifecycleManager datasetLifeCycleManager = runtimeCtx.getDatasetLifecycleManager();
                     ILockManager lockManager = runtimeCtx.getTransactionSubsystem().getLockManager();
                     ITransactionManager txnManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
                     // get the local transaction
diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml
index b8673bb..e5ad1b4 100644
--- a/asterix-transactions/pom.xml
+++ b/asterix-transactions/pom.xml
@@ -107,10 +107,15 @@
             <scope>compile</scope>
         </dependency>
         <dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>18.0</version>
-		</dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>18.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.10.19</version>
+        </dependency>
 	</dependencies>
 
 </project>
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 4c2f25d..58bc44e 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -55,7 +55,7 @@
 
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getIndexLifecycleManager();
+                .getDatasetLifecycleManager();
         ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java
index 1693685..876fcb3 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java
@@ -20,7 +20,7 @@
 package org.apache.asterix.transaction.management.opcallbacks;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
@@ -37,8 +37,8 @@
 
     @Override
     public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) {
-        DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx
-                .getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager();
+        IDatasetLifecycleManager dslcManager = ((IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject()).getDatasetLifecycleManager();
         return dslcManager.getOperationTracker(datasetID);
     }
 
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 9c897f2..48ebff3 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -51,7 +51,7 @@
             Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getIndexLifecycleManager();
+                .getDatasetLifecycleManager();
         ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
index 7b17193..7b1ec04 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
@@ -19,8 +19,8 @@
 package org.apache.asterix.transaction.management.opcallbacks;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
@@ -37,8 +37,8 @@
 
     @Override
     public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) {
-        DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx
-                .getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager();
+        IDatasetLifecycleManager dslcManager = ((IAsterixAppRuntimeContext) ctx
+                .getJobletContext().getApplicationContext().getApplicationObject()).getDatasetLifecycleManager();
         return new BaseOperationTracker(dslcManager, datasetID, dslcManager.getDatasetInfo(datasetID));
     }
 
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 4de0749..23eb2be 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -52,7 +52,7 @@
             Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getIndexLifecycleManager();
+                .getDatasetLifecycleManager();
         ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index c2f56a0..6e0394a 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -52,7 +52,7 @@
             Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getIndexLifecycleManager();
+                .getDatasetLifecycleManager();
         ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
index 5851240..26f56b7 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
@@ -22,7 +22,6 @@
 import java.util.Map;
 
 import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -57,10 +56,10 @@
                 bloomFilterKeyFields,
                 runtimeContextProvider.getBloomFilterFalsePositiveRate(),
                 mergePolicyFactory.createMergePolicy(mergePolicyProperties,
-                        runtimeContextProvider.getIndexLifecycleManager()),
-                new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
-                        datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager())
-                                .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
+                        runtimeContextProvider.getDatasetLifecycleManager()),
+                new BaseOperationTracker(runtimeContextProvider.getDatasetLifecycleManager(),
+                        datasetID, runtimeContextProvider.getDatasetLifecycleManager()
+                        .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
                 LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), -1, true);
         return lsmBTree;
     }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
index af315f9..beb1bb8 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
@@ -22,7 +22,6 @@
 import java.util.Map;
 
 import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -69,9 +68,9 @@
                 btreeCmpFactories,
                 runtimeContextProvider.getBloomFilterFalsePositiveRate(),
                 mergePolicyFactory.createMergePolicy(mergePolicyProperties,
-                        runtimeContextProvider.getIndexLifecycleManager()),
-                new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
-                        datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager())
+                        runtimeContextProvider.getDatasetLifecycleManager()),
+                new BaseOperationTracker(runtimeContextProvider.getDatasetLifecycleManager(),
+                        datasetID, runtimeContextProvider.getDatasetLifecycleManager()
                                 .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
                 LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), buddyBtreeFields, -1,
                 true);
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
index 576eccd..b9f5af9 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
@@ -22,7 +22,6 @@
 import java.util.Map;
 
 import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
 import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -69,10 +68,10 @@
                     rtreePolicyType,
                     runtimeContextProvider.getBloomFilterFalsePositiveRate(),
                     mergePolicyFactory.createMergePolicy(mergePolicyProperties,
-                            runtimeContextProvider.getIndexLifecycleManager()),
-                    new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
-                            .getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider
-                            .getIndexLifecycleManager()).getDatasetInfo(datasetID)), runtimeContextProvider
+                            runtimeContextProvider.getDatasetLifecycleManager()),
+                    new BaseOperationTracker(runtimeContextProvider
+                            .getDatasetLifecycleManager(), datasetID, runtimeContextProvider
+                            .getDatasetLifecycleManager().getDatasetInfo(datasetID)), runtimeContextProvider
                             .getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE
                             .createIOOperationCallback(), linearizeCmpFactory, btreeFields, -1, true);
         } catch (TreeIndexException e) {
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index 495d56e..0adcf53 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -76,10 +76,10 @@
                 bloomFilterKeyFields,
                 runtimeContextProvider.getBloomFilterFalsePositiveRate(),
                 mergePolicyFactory.createMergePolicy(mergePolicyProperties,
-                        runtimeContextProvider.getIndexLifecycleManager()),
+                        runtimeContextProvider.getDatasetLifecycleManager()),
                 isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID) : new BaseOperationTracker(
-                        (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID,
-                        ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager())
+                        runtimeContextProvider.getDatasetLifecycleManager(), datasetID,
+                        runtimeContextProvider.getDatasetLifecycleManager()
                                 .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
                 LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), isPrimary, filterTypeTraits,
                 filterCmpFactories, btreeFields, filterFields, true);
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index d76702e..e635349 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -22,7 +22,6 @@
 import java.util.Map;
 
 import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
 import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -90,10 +89,10 @@
                         filePath,
                         runtimeContextProvider.getBloomFilterFalsePositiveRate(),
                         mergePolicyFactory.createMergePolicy(mergePolicyProperties,
-                                runtimeContextProvider.getIndexLifecycleManager()),
-                        new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
-                                .getIndexLifecycleManager(), datasetID,
-                                ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager())
+                                runtimeContextProvider.getDatasetLifecycleManager()),
+                        new BaseOperationTracker(runtimeContextProvider
+                                .getDatasetLifecycleManager(), datasetID,
+                                runtimeContextProvider.getDatasetLifecycleManager()
                                         .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
                         LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(),
                         invertedIndexFields, filterTypeTraits, filterCmpFactories, filterFields,
@@ -111,10 +110,9 @@
                         filePath,
                         runtimeContextProvider.getBloomFilterFalsePositiveRate(),
                         mergePolicyFactory.createMergePolicy(mergePolicyProperties,
-                                runtimeContextProvider.getIndexLifecycleManager()),
-                        new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
-                                .getIndexLifecycleManager(), datasetID,
-                                ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager())
+                                runtimeContextProvider.getDatasetLifecycleManager()),
+                        new BaseOperationTracker(runtimeContextProvider.getDatasetLifecycleManager(), datasetID,
+                                runtimeContextProvider.getDatasetLifecycleManager()
                                         .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
                         LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(),
                         invertedIndexFields, filterTypeTraits, filterCmpFactories, filterFields,
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index 31a3283..c99e052 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -23,7 +23,6 @@
 import java.util.Map;
 
 import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
 import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -91,10 +90,10 @@
                     rtreePolicyType,
                     runtimeContextProvider.getBloomFilterFalsePositiveRate(),
                     mergePolicyFactory.createMergePolicy(mergePolicyProperties,
-                            runtimeContextProvider.getIndexLifecycleManager()),
-                    new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
-                            .getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider
-                            .getIndexLifecycleManager()).getDatasetInfo(datasetID)), runtimeContextProvider
+                            runtimeContextProvider.getDatasetLifecycleManager()),
+                    new BaseOperationTracker(runtimeContextProvider
+                            .getDatasetLifecycleManager(), datasetID, runtimeContextProvider
+                            .getDatasetLifecycleManager().getDatasetInfo(datasetID)), runtimeContextProvider
                             .getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE
                             .createIOOperationCallback(), linearizeCmpFactory, rtreeFields, btreeFields,
                     filterTypeTraits, filterCmpFactories, filterFields, true);
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
index f6880db..65ed01c 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
@@ -18,17 +18,17 @@
  */
 package org.apache.asterix.transaction.management.service.locking;
 
+import static org.mockito.Mockito.mock;
+
 import java.util.List;
 import java.util.concurrent.Executors;
 
 import org.apache.asterix.common.api.AsterixThreadExecutor;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.storage.am.common.api.IIndex;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
@@ -40,7 +40,7 @@
 class TestRuntimeContextProvider implements IAsterixAppRuntimeContextProvider {
 
     AsterixThreadExecutor ate = new AsterixThreadExecutor(Executors.defaultThreadFactory());
-    IIndexLifecycleManager ilm = new IndexLifecycleManager();
+    IDatasetLifecycleManager dlcm = mock(IDatasetLifecycleManager.class);
 
     @Override
     public AsterixThreadExecutor getThreadExecutor() {
@@ -63,8 +63,8 @@
     }
 
     @Override
-    public IIndexLifecycleManager getIndexLifecycleManager() {
-        return ilm;
+    public IDatasetLifecycleManager getDatasetLifecycleManager() {
+        return dlcm;
     }
 
     @Override
@@ -106,41 +106,4 @@
     public IAsterixAppRuntimeContext getAppContext() {
         throw new UnsupportedOperationException();
     }
-
-    static class IndexLifecycleManager implements IIndexLifecycleManager {
-        @Override
-        public List<IIndex> getOpenIndexes() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void register(String resourceName, IIndex index) throws HyracksDataException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void open(String resourceName) throws HyracksDataException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void close(String resourceName) throws HyracksDataException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public IIndex getIndex(String resourceName) throws HyracksDataException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void unregister(String resourceName) throws HyracksDataException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
-            throw new UnsupportedOperationException();
-        }
-    }
 }
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index fec04e4..9bc4c53 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -47,8 +47,8 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.transactions.DatasetId;
@@ -70,7 +70,6 @@
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndex;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -96,7 +95,7 @@
     private final static String RECOVERY_FILES_DIR_NAME = "recovery_temp";
     private static final long MEGABYTE = 1024L * 1024L;
     private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null;
-    private static final long MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE = 4 * MEGABYTE; //2MB;
+    private static final long MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE = 4 * MEGABYTE; //4MB;
 
     /**
      * A file at a known location that contains the LSN of the last log record
@@ -105,7 +104,7 @@
     private static final String CHECKPOINT_FILENAME_PREFIX = "checkpoint_";
     private SystemState state;
 
-    public RecoveryManager(TransactionSubsystem txnSubsystem) throws ACIDException {
+    public RecoveryManager(TransactionSubsystem txnSubsystem) {
         this.txnSubsystem = txnSubsystem;
         this.logMgr = (LogManager) txnSubsystem.getLogManager();
         this.checkpointHistory = this.txnSubsystem.getTransactionProperties().getCheckpointHistory();
@@ -268,7 +267,9 @@
             boolean foundWinner = false;
 
             IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
-            IIndexLifecycleManager indexLifecycleManager = appRuntimeContext.getIndexLifecycleManager();
+            //get datasetLifeCycleManager
+            IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                    .getDatasetLifecycleManager();
             ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
             Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
                     .loadAndGetAllResources();
@@ -320,17 +321,17 @@
                             //get index instance from IndexLifeCycleManager
                             //if index is not registered into IndexLifeCycleManager,
                             //create the index using LocalMetadata stored in LocalResourceRepository
-                            index = (ILSMIndex) indexLifecycleManager.getIndex(localResource.getResourceName());
+                            index = (ILSMIndex) datasetLifecycleManager.getIndex(localResource.getResourceName());
                             if (index == null) {
                                 //#. create index instance and register to indexLifeCycleManager
                                 localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
                                 index = localResourceMetadata.createIndexInstance(appRuntimeContext,
                                         localResource.getResourceName(), localResource.getPartition());
-                                indexLifecycleManager.register(localResource.getResourceName(), index);
-                                indexLifecycleManager.open(localResource.getResourceName());
+                                datasetLifecycleManager.register(localResource.getResourceName(), index);
+                                datasetLifecycleManager.open(localResource.getResourceName());
 
                                 //#. get maxDiskLastLSN
-                                ILSMIndex lsmIndex = (ILSMIndex) index;
+                                ILSMIndex lsmIndex = index;
                                 maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
                                         .getComponentLSN(lsmIndex.getImmutableComponents());
 
@@ -341,7 +342,7 @@
                             }
 
                             if (LSN > maxDiskLastLsn) {
-                                redo(logRecord);
+                                redo(logRecord, datasetLifecycleManager);
                                 redoCount++;
                             }
                         }
@@ -361,7 +362,7 @@
             //close all indexes
             Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
             for (long r : resourceIdList) {
-                indexLifecycleManager.close(resourcesMap.get(r).getResourceName());
+                datasetLifecycleManager.close(resourcesMap.get(r).getResourceName());
             }
 
             if (LOGGER.isLoggable(Level.INFO)) {
@@ -398,8 +399,8 @@
         //right after the new checkpoint file is written.
         File[] prevCheckpointFiles = getPreviousCheckpointFiles();
 
-        DatasetLifecycleManager datasetLifecycleManager = (DatasetLifecycleManager) txnSubsystem
-                .getAsterixAppRuntimeContextProvider().getIndexLifecycleManager();
+        IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getDatasetLifecycleManager();
         //flush all in-memory components if it is the sharp checkpoint
         if (isSharpCheckpoint) {
             datasetLifecycleManager.flushAllDatasets();
@@ -474,9 +475,9 @@
     }
 
     public long getMinFirstLSN() throws HyracksDataException {
-        IIndexLifecycleManager indexLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getIndexLifecycleManager();
-        List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
+        IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getDatasetLifecycleManager();
+        List<IIndex> openIndexList = datasetLifecycleManager.getOpenIndexes();
         long firstLSN;
         //the min first lsn can only be the current append or smaller
         long minFirstLSN = logMgr.getAppendLSN();
@@ -557,7 +558,7 @@
         return parentDir.listFiles(filter);
     }
 
-    private String getCheckpointFileName(String baseDir, String suffix) {
+    private static String getCheckpointFileName(String baseDir, String suffix) {
         if (!baseDir.endsWith(System.getProperty("file.separator"))) {
             baseDir += System.getProperty("file.separator");
         }
@@ -703,6 +704,8 @@
             //undo loserTxn's effect
             LOGGER.log(Level.INFO, "undoing loser transaction's effect");
 
+            IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                    .getDatasetLifecycleManager();
             //TODO sort loser entities by smallest LSN to undo in one pass. 
             Iterator<Entry<TxnId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator();
             int undoCount = 0;
@@ -719,7 +722,7 @@
                     if (IS_DEBUG_MODE) {
                         LOGGER.info(logRecord.getLogRecordForDisplay());
                     }
-                    undo(logRecord);
+                    undo(logRecord, datasetLifecycleManager);
                     undoCount++;
                 }
             }
@@ -749,10 +752,10 @@
         // do nothing
     }
 
-    private void undo(ILogRecord logRecord) {
+    private static void undo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) {
         try {
-            ILSMIndex index = (ILSMIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
-                    .getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
+            ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
+                    logRecord.getResourceId());
             ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
             if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
@@ -767,10 +770,10 @@
         }
     }
 
-    private void redo(ILogRecord logRecord) {
+    private static void redo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) {
         try {
-            ILSMIndex index = (ILSMIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
-                    .getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
+            ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
+                    logRecord.getResourceId());
             ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
             if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
@@ -982,7 +985,7 @@
     public TxnId() {
     }
 
-    private void readPKValueIntoByteArray(ITupleReference pkValue, int pkSize, byte[] byteArrayPKValue) {
+    private static void readPKValueIntoByteArray(ITupleReference pkValue, int pkSize, byte[] byteArrayPKValue) {
         int readOffset = pkValue.getFieldStart(0);
         byte[] readBuffer = pkValue.getFieldData(0);
         for (int i = 0; i < pkSize; i++) {
@@ -1034,7 +1037,7 @@
         }
     }
 
-    private boolean isEqual(byte[] a, byte[] b, int size) {
+    private static boolean isEqual(byte[] a, byte[] b, int size) {
         for (int i = 0; i < size; i++) {
             if (a[i] != b[i]) {
                 return false;
@@ -1043,7 +1046,7 @@
         return true;
     }
 
-    private boolean isEqual(byte[] a, ITupleReference b, int size) {
+    private static boolean isEqual(byte[] a, ITupleReference b, int size) {
         int readOffset = b.getFieldStart(0);
         byte[] readBuffer = b.getFieldData(0);
         for (int i = 0; i < size; i++) {
@@ -1054,7 +1057,7 @@
         return true;
     }
 
-    private boolean isEqual(ITupleReference a, ITupleReference b, int size) {
+    private static boolean isEqual(ITupleReference a, ITupleReference b, int size) {
         int aOffset = a.getFieldStart(0);
         byte[] aBuffer = a.getFieldData(0);
         int bOffset = b.getFieldStart(0);
@@ -1067,7 +1070,7 @@
         return true;
     }
 
-    public void serialize(ByteBuffer buffer) throws IOException {
+    public void serialize(ByteBuffer buffer) {
         buffer.putInt(jobId);
         buffer.putInt(datasetId);
         buffer.putInt(pkHashValue);
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
index 2551ee2..1686e17 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
@@ -19,8 +19,8 @@
 package org.apache.asterix.transaction.management.service.transaction;
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
@@ -65,9 +65,9 @@
     }
 
     @Override
-    public IIndexLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
+    public IDatasetLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
         return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
-                .getIndexLifecycleManager();
+                .getDatasetLifecycleManager();
     }
 
     @Override
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index 09fbb06..eeb65e2 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -56,7 +56,7 @@
         this.recoveryManager = new RecoveryManager(this);
         if (asterixAppRuntimeContextProvider != null) {
             this.checkpointThread = new CheckpointThread(recoveryManager,
-                    asterixAppRuntimeContextProvider.getIndexLifecycleManager(),logManager,
+                    asterixAppRuntimeContextProvider.getDatasetLifecycleManager(),logManager,
                     this.txnProperties.getCheckpointLSNThreshold(), this.txnProperties.getCheckpointPollFrequency());
             this.checkpointThread.start();
         } else {