[NO ISSUE][STO] Fix memory leaks in storage

- user model changes: no
- storage format changes: no
- interface changes: yes
  - Added javadocs to:
  -- IBufferCache
  -- IExtraPageBlockHelper
  - Moved IBufferCache.setPageDiskId -> ICachedPage.setDiskPageId
  - Renamed:
  -- IBufferCache.flushDirtyPage -> IBufferCache.flush
  -- IBufferCache.getNumPages -> IBufferCache.getPageBudget
  - Removed:
  -- IBufferCache.adviseWontNeed [not used]
  -- IBufferCache.tryPin [not used]

details:
- Previously, when adding a kv pair to the metadata of a memory
  component, we add a new Pair item to the ArrayList. After
  this change, we only update it if it exists.
- VirtualBufferCache used to leak pages when reclaiming pages
  of a file after deletion. This has also been fixed.
- New tests for VirtualBufferCache added:
  - Checks for memory budget after end of testDisjointPins
  - Concurrent Users pinning pages concurrently
  - Test for large pages and ensuring allocated large
    pages are accounted for through removal of cached
    free pages.

Change-Id: I4ae9736c9b5fdba5795245bdf835c023e3f73b15
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2115
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index cbcc44f..d13a15d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -31,7 +31,6 @@
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.active.IActiveNotificationHandler;
-import org.apache.asterix.active.SingleThreadEventProcessor;
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -48,6 +47,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
 
 public class ActiveNotificationHandler extends SingleThreadEventProcessor<ActiveEvent>
         implements IActiveNotificationHandler, IJobLifecycleListener {
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
index ce7eb3d..7eba9eb 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -77,7 +77,7 @@
   </property>
   <property>
     <name>storage.memorycomponent.numpages</name>
-    <value>8</value>
+    <value>16</value>
     <description>The number of pages to allocate for a memory component.
       This budget is shared by all the memory components of the primary
       index and all its secondary indexes across all I/O devices on a node.
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 104f80b..b155b51 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.app.bootstrap;
 
 import java.io.File;
+import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -36,20 +37,27 @@
 import org.apache.asterix.common.context.TransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.test.runtime.ExecutionTestUtil;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
@@ -81,11 +89,13 @@
 import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
 import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
 import org.apache.hyracks.storage.common.IResourceFactory;
@@ -113,9 +123,7 @@
     public static final double BLOOM_FILTER_FALSE_POSITIVE_RATE = 0.01;
     public static final TransactionSubsystemProvider TXN_SUBSYSTEM_PROVIDER = TransactionSubsystemProvider.INSTANCE;
     // Mutables
-    private JobId jobId;
     private long jobCounter = 0L;
-    private IHyracksJobletContext jobletCtx;
     private final String testConfigFileName;
     private final boolean runHDFS;
 
@@ -137,9 +145,6 @@
             th.printStackTrace();
             throw th;
         }
-        jobletCtx = Mockito.mock(IHyracksJobletContext.class);
-        Mockito.when(jobletCtx.getServiceContext()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getContext());
-        Mockito.when(jobletCtx.getJobId()).thenReturn(jobId);
     }
 
     public void deInit() throws Exception {
@@ -147,20 +152,24 @@
         ExecutionTestUtil.tearDown(cleanupOnStop);
     }
 
-    public org.apache.asterix.common.transactions.JobId getTxnJobId() {
-        return new org.apache.asterix.common.transactions.JobId((int) jobId.getId());
+    public org.apache.asterix.common.transactions.JobId getTxnJobId(IHyracksTaskContext ctx) {
+        return new org.apache.asterix.common.transactions.JobId((int) ctx.getJobletContext().getJobId().getId());
     }
 
     public Pair<LSMInsertDeleteOperatorNodePushable, CommitRuntime> getInsertPipeline(IHyracksTaskContext ctx,
-            Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
-            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields,
+            Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, int[] filterFields,
             int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
-            StorageComponentProvider storageComponentProvider) throws AlgebricksException, HyracksDataException {
+            StorageComponentProvider storageComponentProvider)
+            throws AlgebricksException, HyracksDataException, RemoteException, ACIDException {
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy =
+                DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
-                mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators);
+                mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators);
         IndexOperation op = IndexOperation.INSERT;
         IModificationOperationCallbackFactory modOpCallbackFactory =
-                new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(), dataset.getDatasetId(),
+                new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(ctx), dataset.getDatasetId(),
                         primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, Operation.get(op),
                         ResourceType.LSM_BTREE);
         IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
@@ -170,7 +179,7 @@
                 primaryIndexInfo.primaryIndexInsertFieldsPermutations,
                 recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0), op,
                 true, indexHelperFactory, modOpCallbackFactory, null);
-        CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(), dataset.getDatasetId(),
+        CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(),
                 primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION, true);
         insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
         commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
@@ -204,8 +213,7 @@
     }
 
     public JobId newJobId() {
-        jobId = new JobId(jobCounter++);
-        return jobId;
+        return new JobId(jobCounter++);
     }
 
     public IResourceFactory getPrimaryResourceFactory(IHyracksTaskContext ctx, PrimaryIndexInfo primaryIndexInfo,
@@ -225,18 +233,22 @@
     }
 
     public PrimaryIndexInfo createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
-            ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
-            int[] filterFields, IStorageComponentProvider storageComponentProvider, int[] primaryKeyIndexes,
-            List<Integer> primaryKeyIndicators) throws AlgebricksException, HyracksDataException {
+            ARecordType metaType, int[] filterFields, IStorageComponentProvider storageComponentProvider,
+            int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators)
+            throws AlgebricksException, HyracksDataException, RemoteException, ACIDException {
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy =
+                DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
-                mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators);
+                mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators);
         Dataverse dataverse = new Dataverse(dataset.getDataverseName(), NonTaggedDataFormat.class.getName(),
                 MetadataUtil.PENDING_NO_OP);
         MetadataProvider mdProvider = new MetadataProvider(
                 (ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), dataverse);
         try {
             IResourceFactory resourceFactory = dataset.getResourceFactory(mdProvider, primaryIndexInfo.index,
-                    recordType, metaType, mergePolicyFactory, mergePolicyProperties);
+                    recordType, metaType, mergePolicy.first, mergePolicy.second);
             IndexBuilderFactory indexBuilderFactory =
                     new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
                             primaryIndexInfo.getFileSplitProvider(), resourceFactory, !dataset.isTemp());
@@ -283,6 +295,10 @@
         if (withMessaging) {
             TaskUtil.put(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
         }
+        JobId jobId = newJobId();
+        IHyracksJobletContext jobletCtx = Mockito.mock(IHyracksJobletContext.class);
+        Mockito.when(jobletCtx.getServiceContext()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getContext());
+        Mockito.when(jobletCtx.getJobId()).thenReturn(jobId);
         ctx = Mockito.spy(ctx);
         Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
         Mockito.when(ctx.getIoManager()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getIoManager());
@@ -410,4 +426,84 @@
                 (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
         return appCtx.getStorageManager();
     }
+
+    public Pair<LSMPrimaryUpsertOperatorNodePushable, CommitRuntime> getUpsertPipeline(IHyracksTaskContext ctx,
+            Dataset dataset, IAType[] keyTypes, ARecordType recordType, ARecordType metaType, int[] filterFields,
+            int[] keyIndexes, List<Integer> keyIndicators, StorageComponentProvider storageComponentProvider,
+            IFrameOperationCallbackFactory frameOpCallbackFactory, boolean hasSecondaries) throws Exception {
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy =
+                DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, keyTypes, recordType, metaType,
+                mergePolicy.first, mergePolicy.second, filterFields, keyIndexes, keyIndicators);
+        IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
+                storageComponentProvider, primaryIndexInfo.index, getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes);
+        ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
+                storageComponentProvider, primaryIndexInfo.index, getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes);
+        IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
+        IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
+                storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+        LSMPrimaryUpsertOperatorNodePushable insertOp = new LSMPrimaryUpsertOperatorNodePushable(ctx, PARTITION,
+                indexHelperFactory, primaryIndexInfo.primaryIndexInsertFieldsPermutations,
+                recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0),
+                modificationCallbackFactory, searchCallbackFactory, keyIndexes.length, recordType, -1,
+                frameOpCallbackFactory == null ? dataset.getFrameOpCallbackFactory() : frameOpCallbackFactory,
+                MissingWriterFactory.INSTANCE, hasSecondaries);
+        RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
+                filterFields == null ? 0 : filterFields.length, recordType, metaType);
+        // fix pk fields
+        int diff = upsertOutRecDesc.getFieldCount() - primaryIndexInfo.rDesc.getFieldCount();
+        int[] pkFieldsInCommitOp = new int[dataset.getPrimaryKeys().size()];
+        for (int i = 0; i < pkFieldsInCommitOp.length; i++) {
+            pkFieldsInCommitOp[i] = diff + i;
+        }
+        CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(), pkFieldsInCommitOp,
+                false, true, PARTITION, true);
+        insertOp.setOutputFrameWriter(0, commitOp, upsertOutRecDesc);
+        commitOp.setInputRecordDescriptor(0, upsertOutRecDesc);
+        return Pair.of(insertOp, commitOp);
+    }
+
+    private RecordDescriptor getUpsertOutRecDesc(RecordDescriptor inputRecordDesc, Dataset dataset, int numFilterFields,
+            ARecordType itemType, ARecordType metaItemType) throws Exception {
+        ITypeTraits[] outputTypeTraits =
+                new ITypeTraits[inputRecordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+        ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[inputRecordDesc.getFieldCount()
+                + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+
+        // add the previous record first
+        int f = 0;
+        outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+        f++;
+        // add the previous meta second
+        if (dataset.hasMetaPart()) {
+            outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
+            outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+            f++;
+        }
+        // add the previous filter third
+        int fieldIdx = -1;
+        if (numFilterFields > 0) {
+            String filterField = DatasetUtil.getFilterField(dataset).get(0);
+            String[] fieldNames = itemType.getFieldNames();
+            int i = 0;
+            for (; i < fieldNames.length; i++) {
+                if (fieldNames[i].equals(filterField)) {
+                    break;
+                }
+            }
+            fieldIdx = i;
+            outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider()
+                    .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+            outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
+                    .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+            f++;
+        }
+        for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
+            outputTypeTraits[j + f] = inputRecordDesc.getTypeTraits()[j];
+            outputSerDes[j + f] = inputRecordDesc.getFields()[j];
+        }
+        return new RecordDescriptor(outputSerDes, outputTypeTraits);
+    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
index 8d21b55..c50a4a2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.test.active;
 
-import org.apache.asterix.active.SingleThreadEventProcessor;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
 
 public class Actor extends SingleThreadEventProcessor<Action> {
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 76bec8c..36cb4bb 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -132,12 +132,12 @@
     public void createIndex() throws Exception {
         List<List<String>> partitioningKeys = new ArrayList<>();
         partitioningKeys.add(Collections.singletonList("key"));
-        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
-                NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
                         partitioningKeys, null, null, null, false, null, false),
                 null, DatasetType.INTERNAL, DATASET_ID, 0);
-        PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
-                new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
+        PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+                storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
         IndexDataflowHelperFactory iHelperFactory =
                 new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
         ctx = nc.createTestContext(false);
@@ -146,9 +146,9 @@
         lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
         indexDataflowHelper.close();
         nc.newJobId();
-        txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
-        insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
-                null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager).getLeft();
+        txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+        insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+                KEY_INDICATORS_LIST, storageManager).getLeft();
     }
 
     @After
@@ -174,7 +174,7 @@
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
@@ -224,7 +224,7 @@
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
@@ -254,9 +254,9 @@
 
             // insert again
             nc.newJobId();
-            txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
-            insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
-                    null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager).getLeft();
+            txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+            insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+                    KEY_INDICATORS_LIST, storageManager).getLeft();
             insertOp.open();
             for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
                 ITupleReference tuple = tupleGenerator.next();
@@ -291,7 +291,7 @@
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
@@ -360,7 +360,7 @@
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
@@ -416,7 +416,7 @@
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
@@ -481,7 +481,7 @@
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
@@ -541,7 +541,7 @@
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
@@ -603,7 +603,7 @@
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
@@ -675,7 +675,7 @@
             VSizeFrame frame = new VSizeFrame(ctx);
             FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
             for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
-                // flush every 1000 records
+                // flush every RECORDS_PER_COMPONENT records
                 if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
                     if (tupleAppender.getTupleCount() > 0) {
                         tupleAppender.write(insertOp, true);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index a0e3aa9..82eb16a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -110,20 +110,19 @@
             StorageComponentProvider storageManager = new StorageComponentProvider();
             List<List<String>> partitioningKeys = new ArrayList<>();
             partitioningKeys.add(Collections.singletonList("key"));
-            Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
-                    NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+            Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                    NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
                             partitioningKeys, null, null, null, false, null, false),
                     null, DatasetType.INTERNAL, DATASET_ID, 0);
             try {
-                PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
-                        new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
+                PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+                        storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
                 IHyracksTaskContext ctx = nc.createTestContext(true);
                 nc.newJobId();
-                ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
-                LSMInsertDeleteOperatorNodePushable insertOp = nc
-                        .getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
-                                null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager)
-                        .getLeft();
+                ITransactionContext txnCtx =
+                        nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+                LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager).getLeft();
                 insertOp.open();
                 TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
                         RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 41b3d38..5384c92 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -119,21 +119,20 @@
             nc.init();
             List<List<String>> partitioningKeys = new ArrayList<>();
             partitioningKeys.add(Collections.singletonList("key"));
-            Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
-                    NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+            Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                    NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
                             partitioningKeys, null, null, null, false, null, false),
                     null, DatasetType.INTERNAL, DATASET_ID, 0);
             try {
-                nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,
-                        null, storageManager, KEY_INDEXES, KEY_INDICATOR_LIST);
+                nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, KEY_INDEXES,
+                        KEY_INDICATOR_LIST);
                 IHyracksTaskContext ctx = nc.createTestContext(false);
                 nc.newJobId();
-                ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+                ITransactionContext txnCtx =
+                        nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
                 // Prepare insert operation
-                LSMInsertDeleteOperatorNodePushable insertOp = nc
-                        .getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
-                                null, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager)
-                        .getLeft();
+                LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager).getLeft();
                 insertOp.open();
                 TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
                         RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
index 58697a9..eb47248 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
@@ -119,26 +119,23 @@
             StorageComponentProvider storageManager = new StorageComponentProvider();
             List<List<String>> partitioningKeys = new ArrayList<>();
             partitioningKeys.add(Collections.singletonList("key"));
-            Dataset dataset =
-                    new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, null,
-                            null,
-                            new InternalDatasetDetails(null, PartitioningStrategy.HASH, partitioningKeys, null, null,
-                                    null, false, null, false), null, DatasetType.INTERNAL, DATASET_ID, 0);
+            Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                    NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                            partitioningKeys, null, null, null, false, null, false),
+                    null, DatasetType.INTERNAL, DATASET_ID, 0);
             try {
-                nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,
-                        null, storageManager, KEY_INDEXES, KEY_INDICATOR_LIST);
+                nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, KEY_INDEXES,
+                        KEY_INDICATOR_LIST);
                 IHyracksTaskContext ctx = nc.createTestContext(false);
                 nc.newJobId();
-                ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+                ITransactionContext txnCtx =
+                        nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
                 // Prepare insert operation
-                LSMInsertDeleteOperatorNodePushable insertOp =
-                        nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
-                                new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager)
-                                .getLeft();
+                LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager).getLeft();
                 insertOp.open();
-                TupleGenerator tupleGenerator =
-                        new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR, RECORD_GEN_FUNCTION,
-                                UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+                TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
+                        RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
                 VSizeFrame frame = new VSizeFrame(ctx);
                 FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
                 // Insert records until disk becomes full
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 895cd55..6d3b6c2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -229,7 +229,6 @@
     public static final int ACTIVE_ENTITY_LISTENER_IS_NOT_REGISTERED = 3097;
     public static final int CANNOT_DERIGESTER_ACTIVE_ENTITY_LISTENER = 3098;
     public static final int DOUBLE_INITIALIZATION_OF_ACTIVE_NOTIFICATION_HANDLER = 3099;
-    public static final int FAILED_TO_SHUTDOWN_EVENT_PROCESSOR = 3100;
     public static final int DOUBLE_RECOVERY_ATTEMPTS = 3101;
     public static final int UNREPORTED_TASK_FAILURE_EXCEPTION = 3102;
     public static final int ACTIVE_ENTITY_ALREADY_SUSPENDED = 3103;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 98b8e2f..e428721 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -218,7 +218,6 @@
 3097 = Active Entity %1$s has not been registered
 3098 = Cannot deregister %1$s because it is active
 3099 = Attempt to initialize an initialized Active Notification Handler
-3100 = Failed to shutdown event processor for %1$s
 3101 = Recovery request while recovery is currently ongoing
 3102 = Unreported exception causing task failure
 3103 = %1$s is already suspended and has state %2$s
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index a071345..42da1d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -133,6 +133,10 @@
     public static final int ILLEGAL_ATTEMPT_TO_EXIT_EMPTY_COMPONENT = 97;
     public static final int A_FLUSH_OPERATION_HAS_FAILED = 98;
     public static final int A_MERGE_OPERATION_HAS_FAILED = 99;
+    public static final int FAILED_TO_SHUTDOWN_EVENT_PROCESSOR = 100;
+    public static final int PAGE_DOES_NOT_EXIST_IN_FILE = 101;
+    public static final int VBC_ALREADY_OPEN = 102;
+    public static final int VBC_ALREADY_CLOSED = 103;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
similarity index 91%
rename from asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
index de6682d..21965c7 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
@@ -16,14 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.active;
+package org.apache.hyracks.api.util;
 
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class SingleThreadEventProcessor<T> implements Runnable {
@@ -75,7 +74,7 @@
             LOGGER.log(Level.WARNING,
                     "Failed to stop event processor after " + attempt + " attempts. Interrupted exception swallowed?");
             if (attempt == 10) {
-                throw new RuntimeDataException(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
+                throw HyracksDataException.create(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
             }
             executorThread.interrupt();
             executorThread.join(1000);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index ae579b4..bc83f8c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -116,5 +116,9 @@
 97 = Illegal attempt to exit empty component
 98 = A flush operation has failed
 99 = A merge operation has failed
+100 = Failed to shutdown event processor for %1$s
+101 = Page %1$s does not exist in file %2$s
+102 = Failed to open virtual buffer cache since it is already open
+103 = Failed to close virtual buffer cache since it is already closed
 
 10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
index 77f18ea..05417a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
@@ -25,19 +25,6 @@
 
     protected int length;
 
-    /**
-     * copies the content of this pointable to the passed byte array.
-     * the array is expected to be at least of length = length of this pointable
-     *
-     * @param copy
-     *            the array to write into
-     * @throws ArrayIndexOutOfBoundsException
-     *             if the passed array size is smaller than length
-     */
-    public void copyInto(byte[] copy) {
-        System.arraycopy(bytes, start, copy, 0, length);
-    }
-
     @Override
     public void set(byte[] bytes, int start, int length) {
         this.bytes = bytes;
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
new file mode 100644
index 0000000..5de0b84
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.data.std.util;
+
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class DataUtils {
+
+    private DataUtils() {
+    }
+
+    /**
+     * Copies the content of this pointable to the passed byte array.
+     * the array is expected to be at least of length = length of this pointable
+     *
+     * @param value
+     *            the value to be copied
+     * @param copy
+     *            the array to write into
+     * @throws ArrayIndexOutOfBoundsException
+     *             if the passed array size is smaller than length
+     */
+    public static void copyInto(IValueReference value, byte[] copy) {
+        System.arraycopy(value.getByteArray(), value.getStartOffset(), copy, 0, value.getLength());
+    }
+
+    /**
+     * Copies the content of this pointable to the passed byte array.
+     * the array is expected to be at least of length = offset + length of this pointable
+     *
+     * @param value
+     *            the value to be copied
+     * @param copy
+     *            the array to write into
+     * @param offset
+     *            the offset to start writing from
+     * @throws ArrayIndexOutOfBoundsException
+     *             if the passed array size - offset is smaller than length
+     */
+    public static void copyInto(IValueReference value, byte[] copy, int offset) {
+        System.arraycopy(value.getByteArray(), value.getStartOffset(), copy, offset, value.getLength());
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
index 5e3042e..1fbcbb0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
@@ -1135,7 +1135,7 @@
 
                 ((IBTreeInteriorFrame) interiorFrame).deleteGreatest();
                 int finalPageId = freePageManager.takePage(metaFrame);
-                bufferCache.setPageDiskId(frontier.page, BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
+                frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
                 pagesToWrite.add(frontier.page);
                 splitKey.setLeftPage(finalPageId);
 
@@ -1156,7 +1156,7 @@
             if (level < 1) {
                 ICachedPage lastLeaf = nodeFrontiers.get(level).page;
                 int lastLeafPage = nodeFrontiers.get(level).pageId;
-                setPageDpid(lastLeaf, nodeFrontiers.get(level).pageId);
+                lastLeaf.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), nodeFrontiers.get(level).pageId));
                 queue.put(lastLeaf);
                 nodeFrontiers.get(level).page = null;
                 persistFrontiers(level + 1, lastLeafPage);
@@ -1171,7 +1171,7 @@
             }
             ((IBTreeInteriorFrame) interiorFrame).setRightmostChildPageId(rightPage);
             int finalPageId = freePageManager.takePage(metaFrame);
-            setPageDpid(frontier.page, finalPageId);
+            frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
             queue.put(frontier.page);
             frontier.pageId = finalPageId;
 
@@ -1193,10 +1193,6 @@
         public void abort() throws HyracksDataException {
             super.handleException();
         }
-
-        private void setPageDpid(ICachedPage page, int pageId) {
-            bufferCache.setPageDiskId(page, BufferedFileHandle.getDiskPageId(getFileId(), pageId));
-        }
     }
 
     @SuppressWarnings("rawtypes")
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
index 1ee48f6..a051364 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
@@ -219,7 +219,7 @@
                 confiscatedPage.releaseWriteLatch(false);
             }
             int finalMetaPage = getMaxPageId(metaFrame) + 1;
-            bufferCache.setPageDiskId(confiscatedPage, BufferedFileHandle.getDiskPageId(fileId, finalMetaPage));
+            confiscatedPage.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, finalMetaPage));
             queue.put(confiscatedPage);
             bufferCache.finishQueue();
             metadataPage = getMetadataPageId();
@@ -345,8 +345,10 @@
             try {
                 frame.setPage(page);
                 int inPageOffset = frame.getOffset(key);
-                return inPageOffset >= 0 ? ((long) pageId * bufferCache.getPageSizeWithHeader()) + frame.getOffset(key)
-                        + IBufferCache.RESERVED_HEADER_BYTES : -1L;
+                return inPageOffset >= 0
+                        ? ((long) pageId * bufferCache.getPageSizeWithHeader()) + frame.getOffset(key)
+                                + IBufferCache.RESERVED_HEADER_BYTES
+                        : -1L;
             } finally {
                 page.releaseReadLatch();
                 unpinPage(page);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
index 686fe78..d8afd12 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
@@ -195,7 +195,7 @@
             metaFrame.setMaxPage(1);
         } finally {
             metaNode.releaseWriteLatch(true);
-            bufferCache.flushDirtyPage(metaNode);
+            bufferCache.flush(metaNode);
             bufferCache.unpin(metaNode);
         }
         int rootPage = getRootPageId();
@@ -207,7 +207,7 @@
             leafFrame.initBuffer((byte) 0);
         } finally {
             rootNode.releaseWriteLatch(true);
-            bufferCache.flushDirtyPage(rootNode);
+            bufferCache.flush(rootNode);
             bufferCache.unpin(rootNode);
         }
     }
@@ -249,7 +249,7 @@
                 metaFrame.setValid(true);
             } finally {
                 metaNode.releaseWriteLatch(true);
-                bufferCache.flushDirtyPage(metaNode);
+                bufferCache.flush(metaNode);
                 bufferCache.unpin(metaNode);
                 ready = true;
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index ae66402..f03a358 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -342,4 +342,9 @@
     public boolean hasMemoryComponents() {
         return true;
     }
+
+    @Override
+    public String toString() {
+        return "{\"class\":\"" + getClass().getSimpleName() + "\",\"file\":\"" + file.getRelativePath() + "\"}";
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 280cc52..0b59e91 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -197,4 +197,9 @@
                 .addBulkLoader(createIndexBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex));
         return chainedBulkLoader;
     }
+
+    @Override
+    public String toString() {
+        return "{\"class\":" + getClass().getSimpleName() + "\", \"index\":" + getIndex().toString() + "}";
+    }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index a4fe35c..57db635 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -255,6 +255,6 @@
     @Override
     public long getSize() {
         IBufferCache virtualBufferCache = getIndex().getBufferCache();
-        return virtualBufferCache.getNumPages() * (long) virtualBufferCache.getPageSize();
+        return virtualBufferCache.getPageBudget() * (long) virtualBufferCache.getPageSize();
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
index b7d2ea3..d1244ce 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
@@ -45,7 +45,7 @@
 
     @Override
     public IValueReference get(IValueReference key) throws HyracksDataException {
-        IPointable value = VoidPointable.FACTORY.createPointable();
+        VoidPointable value = VoidPointable.FACTORY.createPointable();
         get(key, value);
         return value;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
index dcc9355..1b827b7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
@@ -20,6 +20,8 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -31,6 +33,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata;
 
 public class MemoryComponentMetadata implements IComponentMetadata {
+    private static final Logger LOGGER = Logger.getLogger(MemoryComponentMetadata.class.getName());
     private static final byte[] empty = new byte[0];
     private final List<org.apache.commons.lang3.tuple.Pair<IValueReference, ArrayBackedValueStorage>> store =
             new ArrayList<>();
@@ -43,9 +46,9 @@
         ArrayBackedValueStorage stored = get(key);
         if (stored == null) {
             stored = new ArrayBackedValueStorage();
+            store.add(Pair.of(key, stored));
         }
         stored.assign(value);
-        store.add(Pair.of(key, stored));
     }
 
     /**
@@ -71,8 +74,12 @@
     }
 
     public void copy(IMetadataPageManager mdpManager) throws HyracksDataException {
+        LOGGER.log(Level.INFO, "Copying Metadata into a different component");
         ITreeIndexMetadataFrame frame = mdpManager.createMetadataFrame();
         for (Pair<IValueReference, ArrayBackedValueStorage> pair : store) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.log(Level.INFO, "Copying " + pair.getKey() + " : " + pair.getValue().getLength() + " bytes");
+            }
             mdpManager.put(frame, pair.getKey(), pair.getValue());
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
index 83e140c..7a3d58b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
@@ -20,8 +20,6 @@
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -34,7 +32,6 @@
 import org.apache.hyracks.util.JSONUtil;
 
 public class MultitenantVirtualBufferCache implements IVirtualBufferCache {
-    private static final Logger LOGGER = Logger.getLogger(ExternalIndexHarness.class.getName());
 
     private final IVirtualBufferCache vbc;
     private int openCount;
@@ -65,11 +62,6 @@
     }
 
     @Override
-    public ICachedPage tryPin(long dpid) throws HyracksDataException {
-        return vbc.tryPin(dpid);
-    }
-
-    @Override
     public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
         return vbc.pin(dpid, newPage);
     }
@@ -80,8 +72,8 @@
     }
 
     @Override
-    public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
-        vbc.flushDirtyPage(page);
+    public void flush(ICachedPage page) throws HyracksDataException {
+        vbc.flush(page);
     }
 
     @Override
@@ -100,8 +92,8 @@
     }
 
     @Override
-    public int getNumPages() {
-        return vbc.getNumPages();
+    public int getPageBudget() {
+        return vbc.getPageBudget();
     }
 
     @Override
@@ -141,14 +133,6 @@
     }
 
     @Override
-    public void adviseWontNeed(ICachedPage page) {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.log(Level.INFO, "Calling adviseWontNeed on " + this.getClass().getName()
-                    + " makes no sense as this BufferCache cannot evict pages");
-        }
-    }
-
-    @Override
     public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
         return vbc.confiscatePage(dpid);
     }
@@ -175,11 +159,6 @@
     }
 
     @Override
-    public void setPageDiskId(ICachedPage page, long dpid) {
-
-    }
-
-    @Override
     public void returnPage(ICachedPage page, boolean reinsert) {
 
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
index 3195f57..8ce636b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
@@ -31,13 +31,13 @@
 public class NoMergePolicyFactory implements ILSMMergePolicyFactory {
 
     private static final long serialVersionUID = 1L;
-
     private static final String[] SET_VALUES = new String[] {};
     private static final Set<String> PROPERTIES_NAMES = new HashSet<>(Arrays.asList(SET_VALUES));
+    public static final String NAME = "no-merge";
 
     @Override
     public String getName() {
-        return "no-merge";
+        return NAME;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
index 83e377a..3a22793 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
@@ -19,15 +19,16 @@
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.replication.IIOReplicationManager;
@@ -38,42 +39,71 @@
 import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
 import org.apache.hyracks.storage.common.buffercache.VirtualPage;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
-import org.apache.hyracks.storage.common.file.IFileMapManager;
 import org.apache.hyracks.storage.common.file.FileMapManager;
+import org.apache.hyracks.storage.common.file.IFileMapManager;
 import org.apache.hyracks.util.JSONUtil;
 
 public class VirtualBufferCache implements IVirtualBufferCache {
-    private static final Logger LOGGER = Logger.getLogger(ExternalIndexHarness.class.getName());
-
-    private static final int OVERFLOW_PADDING = 8;
+    private static final Logger LOGGER = Logger.getLogger(VirtualBufferCache.class.getName());
 
     private final ICacheMemoryAllocator allocator;
     private final IFileMapManager fileMapManager;
     private final int pageSize;
-    private final int numPages;
-
+    private final int pageBudget;
     private final CacheBucket[] buckets;
-    private final ArrayList<VirtualPage> pages;
-
-    private volatile int nextFree;
+    private final BlockingQueue<VirtualPage> freePages;
     private final AtomicInteger largePages;
-
+    private final AtomicInteger used;
     private boolean open;
 
-    public VirtualBufferCache(ICacheMemoryAllocator allocator, int pageSize, int numPages) {
+    public VirtualBufferCache(ICacheMemoryAllocator allocator, int pageSize, int pageBudget) {
         this.allocator = allocator;
         this.fileMapManager = new FileMapManager();
         this.pageSize = pageSize;
-        this.numPages = 2 * (numPages / 2) + 1;
-
-        buckets = new CacheBucket[this.numPages];
-        pages = new ArrayList<>();
-        nextFree = 0;
+        if (pageBudget == 0) {
+            throw new IllegalArgumentException("Page Budget Cannot be 0");
+        }
+        this.pageBudget = pageBudget;
+        buckets = new CacheBucket[this.pageBudget];
+        freePages = new ArrayBlockingQueue<>(this.pageBudget);
         largePages = new AtomicInteger(0);
+        used = new AtomicInteger(0);
         open = false;
     }
 
     @Override
+    public int getPageSize() {
+        return pageSize;
+    }
+
+    @Override
+    public int getPageSizeWithHeader() {
+        return pageSize;
+    }
+
+    public int getLargePages() {
+        return largePages.get();
+    }
+
+    public int getUsage() {
+        return used.get();
+    }
+
+    public int getPreAllocatedPages() {
+        return freePages.size();
+    }
+
+    @Override
+    public int getPageBudget() {
+        return pageBudget;
+    }
+
+    @Override
+    public boolean isFull() {
+        return used.get() >= pageBudget;
+    }
+
+    @Override
     public int createFile(FileReference fileRef) throws HyracksDataException {
         synchronized (fileMapManager) {
             return fileMapManager.registerFile(fileRef);
@@ -82,16 +112,28 @@
 
     @Override
     public int openFile(FileReference fileRef) throws HyracksDataException {
-        synchronized (fileMapManager) {
-            if (fileMapManager.isMapped(fileRef)) {
-                return fileMapManager.lookupFileId(fileRef);
+        try {
+            synchronized (fileMapManager) {
+                if (fileMapManager.isMapped(fileRef)) {
+                    return fileMapManager.lookupFileId(fileRef);
+                }
+                return fileMapManager.registerFile(fileRef);
             }
-            return fileMapManager.registerFile(fileRef);
+        } finally {
+            logStats();
+        }
+    }
+
+    private void logStats() {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.log(Level.INFO, "Free (allocated) pages = " + freePages.size() + ". Budget = " + pageBudget
+                    + ". Large pages = " + largePages.get() + ". Overall usage = " + used.get());
         }
     }
 
     @Override
     public void openFile(int fileId) throws HyracksDataException {
+        logStats();
     }
 
     @Override
@@ -111,6 +153,7 @@
         synchronized (fileMapManager) {
             fileMapManager.unregisterFile(fileId);
         }
+        int reclaimedPages = 0;
         for (int i = 0; i < buckets.length; i++) {
             final CacheBucket bucket = buckets[i];
             bucket.bucketLock.lock();
@@ -119,16 +162,20 @@
                 VirtualPage curr = bucket.cachedPage;
                 while (curr != null) {
                     if (BufferedFileHandle.getFileId(curr.dpid()) == fileId) {
-                        if (curr.getFrameSizeMultiplier() > 1) {
+                        reclaimedPages++;
+                        if (curr.isLargePage()) {
                             largePages.getAndAdd(-curr.getFrameSizeMultiplier());
+                            used.addAndGet(-curr.getFrameSizeMultiplier());
+                        } else {
+                            used.decrementAndGet();
                         }
                         if (prev == null) {
                             bucket.cachedPage = curr.next();
-                            curr.reset();
+                            recycle(curr);
                             curr = bucket.cachedPage;
                         } else {
                             prev.next(curr.next());
-                            curr.reset();
+                            recycle(curr);
                             curr = prev.next();
                         }
                     } else {
@@ -140,54 +187,27 @@
                 bucket.bucketLock.unlock();
             }
         }
-        defragPageList();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.log(Level.INFO, "Reclaimed pages = " + reclaimedPages);
+        }
+        logStats();
     }
 
-    private void defragPageList() {
-        synchronized (pages) {
-            int start = 0;
-            int end = nextFree - 1;
-            while (start < end) {
-                VirtualPage lastUsed = pages.get(end);
-                while (end > 0 && lastUsed.dpid() == -1) {
-                    --end;
-                    lastUsed = pages.get(end);
-                }
-
-                if (end == 0) {
-                    nextFree = lastUsed.dpid() == -1 ? 0 : 1;
-                    break;
-                }
-
-                VirtualPage firstUnused = pages.get(start);
-                while (start < end && firstUnused.dpid() != -1) {
-                    ++start;
-                    firstUnused = pages.get(start);
-                }
-
-                if (start >= end) {
-                    break;
-                }
-
-                Collections.swap(pages, start, end);
-                nextFree = end;
-                --end;
-                ++start;
-            }
+    private void recycle(VirtualPage page) {
+        // recycle only if
+        // 1. not a large page
+        // 2. allocation is not above budget
+        if (used.get() < pageBudget && !page.isLargePage()) {
+            page.reset();
+            freePages.offer(page);
         }
     }
 
     @Override
-    public ICachedPage tryPin(long dpid) throws HyracksDataException {
-        return pin(dpid, false);
-    }
-
-    @Override
     public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
-        VirtualPage page = null;
+        VirtualPage page;
         int hash = hash(dpid);
         CacheBucket bucket = buckets[hash];
-
         bucket.bucketLock.lock();
         try {
             page = bucket.cachedPage;
@@ -197,15 +217,15 @@
                 }
                 page = page.next();
             }
-
             if (!newPage) {
+                int fileId = BufferedFileHandle.getFileId(dpid);
+                FileReference fileRef;
                 synchronized (fileMapManager) {
-                    throw new HyracksDataException(
-                            "Page " + BufferedFileHandle.getPageId(dpid) + " does not exist in file "
-                                    + fileMapManager.lookupFileName(BufferedFileHandle.getFileId(dpid)));
+                    fileRef = fileMapManager.lookupFileName(fileId);
                 }
+                throw HyracksDataException.create(ErrorCode.PAGE_DOES_NOT_EXIST_IN_FILE,
+                        BufferedFileHandle.getPageId(dpid), fileRef);
             }
-
             page = getOrAllocPage(dpid);
             page.next(bucket.cachedPage);
             bucket.cachedPage = page;
@@ -222,18 +242,13 @@
     }
 
     private VirtualPage getOrAllocPage(long dpid) {
-        VirtualPage page;
-        synchronized (pages) {
-            if (nextFree >= pages.size()) {
-                page = new VirtualPage(allocator.allocate(pageSize, 1)[0], pageSize);
-                page.multiplier(1);
-                pages.add(page);
-            } else {
-                page = pages.get(nextFree);
-            }
-            ++nextFree;
-            page.dpid(dpid);
+        VirtualPage page = freePages.poll();
+        if (page == null) {
+            page = new VirtualPage(allocator.allocate(pageSize, 1)[0], pageSize);
+            page.multiplier(1);
         }
+        page.dpid(dpid);
+        used.incrementAndGet();
         return page;
     }
 
@@ -245,10 +260,26 @@
             // no-op
             return;
         }
+        // Maintain counters
+        // In addition, discard pre-allocated pages as the multiplier of the large page
+        // This is done before actual resizing in order to allow GC for the same budget out of
+        // the available free pages first
         if (origMultiplier == 1) {
-            synchronized (pages) {
-                pages.remove(cPage);
-                nextFree--;
+            largePages.getAndAdd(multiplier);
+            int diff = multiplier - 1;
+            used.getAndAdd(diff);
+            for (int i = 0; i < diff; i++) {
+                freePages.poll();
+            }
+        } else if (multiplier == 1) {
+            largePages.getAndAdd(-origMultiplier);
+            used.addAndGet(-origMultiplier + 1);
+        } else {
+            int diff = multiplier - origMultiplier;
+            largePages.getAndAdd(diff);
+            used.getAndAdd(diff);
+            for (int i = 0; i < diff; i++) {
+                freePages.poll();
             }
         }
         ByteBuffer newBuffer = allocator.allocate(pageSize * multiplier, 1)[0];
@@ -257,15 +288,6 @@
             oldBuffer.limit(newBuffer.capacity());
         }
         newBuffer.put(oldBuffer);
-        if (origMultiplier == 1) {
-            largePages.getAndAdd(multiplier);
-        } else if (multiplier == 1) {
-            largePages.getAndAdd(-origMultiplier);
-            pages.add(0, (VirtualPage) cPage);
-            nextFree++;
-        } else {
-            largePages.getAndAdd(multiplier - origMultiplier);
-        }
         ((VirtualPage) cPage).buffer(newBuffer);
         ((VirtualPage) cPage).multiplier(multiplier);
     }
@@ -275,7 +297,8 @@
     }
 
     @Override
-    public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
+    public void flush(ICachedPage page) throws HyracksDataException {
+        throw new UnsupportedOperationException();
     }
 
     @Override
@@ -283,59 +306,50 @@
     }
 
     @Override
-    public int getPageSize() {
-        return pageSize;
-    }
-
-    @Override
-    public int getPageSizeWithHeader() {
-        return pageSize;
-    }
-
-    @Override
-    public int getNumPages() {
-        return numPages;
-    }
-
-    @Override
     public void open() throws HyracksDataException {
         if (open) {
-            throw new HyracksDataException("Failed to open virtual buffercache since it is already open.");
+            throw HyracksDataException.create(ErrorCode.VBC_ALREADY_OPEN);
         }
-        pages.trimToSize();
-        pages.ensureCapacity(numPages + OVERFLOW_PADDING);
-        allocator.reserveAllocation(pageSize, numPages);
-        for (int i = 0; i < numPages; i++) {
+        allocator.reserveAllocation(pageSize, pageBudget);
+        for (int i = 0; i < pageBudget; i++) {
             buckets[i] = new CacheBucket();
         }
-        nextFree = 0;
         largePages.set(0);
+        used.set(0);
         open = true;
     }
 
     @Override
     public void reset() {
-        for (int i = 0; i < numPages; i++) {
-            buckets[i].cachedPage = null;
-        }
-        int excess = pages.size() - numPages;
-        if (excess > 0) {
-            for (int i = numPages + excess - 1; i >= numPages; i--) {
-                pages.remove(i);
+        recycleAllPages();
+        used.set(0);
+        largePages.set(0);
+    }
+
+    private void recycleAllPages() {
+        for (int i = 0; i < buckets.length; i++) {
+            final CacheBucket bucket = buckets[i];
+            bucket.bucketLock.lock();
+            try {
+                VirtualPage curr = bucket.cachedPage;
+                while (curr != null) {
+                    bucket.cachedPage = curr.next();
+                    recycle(curr);
+                    curr = bucket.cachedPage;
+                }
+            } finally {
+                bucket.bucketLock.unlock();
             }
         }
-        nextFree = 0;
-        largePages.set(0);
     }
 
     @Override
     public void close() throws HyracksDataException {
         if (!open) {
-            throw new HyracksDataException("Failed to close virtual buffercache since it is already closed.");
+            throw HyracksDataException.create(ErrorCode.VBC_ALREADY_CLOSED);
         }
-
-        pages.clear();
-        for (int i = 0; i < numPages; i++) {
+        freePages.clear();
+        for (int i = 0; i < pageBudget; i++) {
             buckets[i].cachedPage = null;
         }
         open = false;
@@ -343,11 +357,11 @@
 
     public String dumpState() {
         StringBuilder sb = new StringBuilder();
-        sb.append(String.format("Page size = %d\n", pageSize));
-        sb.append(String.format("Capacity = %d\n", numPages));
-        sb.append(String.format("Allocated pages = %d\n", pages.size()));
-        sb.append(String.format("Allocated large pages = %d\n", largePages.get()));
-        sb.append(String.format("Next free page = %d\n", nextFree));
+        sb.append(String.format("Page size = %d%n", pageSize));
+        sb.append(String.format("Page budget = %d%n", pageBudget));
+        sb.append(String.format("Used pages = %d%n", used.get()));
+        sb.append(String.format("Used large pages = %d%n", largePages.get()));
+        sb.append(String.format("Available free pages = %d%n", freePages.size()));
         return sb.toString();
     }
 
@@ -356,11 +370,6 @@
         return fileMapManager;
     }
 
-    @Override
-    public boolean isFull() {
-        return (nextFree + largePages.get()) >= numPages;
-    }
-
     private static class CacheBucket {
         private final ReentrantLock bucketLock;
         private VirtualPage cachedPage;
@@ -376,14 +385,6 @@
     }
 
     @Override
-    public void adviseWontNeed(ICachedPage page) {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.log(Level.INFO, "Calling adviseWontNeed on " + this.getClass().getName()
-                    + " makes no sense as this BufferCache cannot evict pages");
-        }
-    }
-
-    @Override
     public void returnPage(ICachedPage page) {
 
     }
@@ -410,11 +411,6 @@
     }
 
     @Override
-    public void setPageDiskId(ICachedPage page, long dpid) {
-
-    }
-
-    @Override
     public void returnPage(ICachedPage page, boolean reinsert) {
         throw new UnsupportedOperationException("Virtual buffer caches don't have FIFO writers");
     }
@@ -449,7 +445,7 @@
         map.put("class", getClass().getSimpleName());
         map.put("allocator", allocator.toString());
         map.put("pageSize", pageSize);
-        map.put("numPages", numPages);
+        map.put("pageBudget", pageBudget);
         map.put("open", open);
         return map;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
index cf40a7a..77dc751 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
@@ -147,7 +147,7 @@
     @Override
     public long getMemoryAllocationSize() {
         IBufferCache virtualBufferCache = btree.getBufferCache();
-        return virtualBufferCache.getNumPages() * virtualBufferCache.getPageSize();
+        return (long) virtualBufferCache.getPageBudget() * virtualBufferCache.getPageSize();
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
index ee7c827..750a2fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
@@ -762,7 +762,7 @@
     }
 
     public RTreeAccessor createAccessor(IModificationOperationCallback modificationCallback,
-                                             ISearchOperationCallback searchCallback, int[] nonIndexFields) {
+            ISearchOperationCallback searchCallback, int[] nonIndexFields) {
         return new RTreeAccessor(this, modificationCallback, searchCallback, nonIndexFields);
     }
 
@@ -1008,7 +1008,7 @@
 
                     int finalPageId = freePageManager.takePage(metaFrame);
                     n.pageId = finalPageId;
-                    bufferCache.setPageDiskId(n.page, BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
+                    n.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
                     //else we are looking at a leaf
                 }
                 //set next guide MBR
@@ -1071,9 +1071,8 @@
                 } else {
                     prevNodeFrontierPages.set(level, finalPageId);
                 }
-                bufferCache.setPageDiskId(frontier.page, BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
+                frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
                 pagesToWrite.add(frontier.page);
-
                 lowerFrame = prevInteriorFrame;
                 lowerFrame.setPage(frontier.page);
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index fcea8e0..d0f4965 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -138,7 +138,7 @@
     }
 
     @Override
-    public int getNumPages() {
+    public int getPageBudget() {
         return pageReplacementStrategy.getMaxAllowedNumPages();
     }
 
@@ -161,33 +161,6 @@
     }
 
     @Override
-    public ICachedPage tryPin(long dpid) throws HyracksDataException {
-        // Calling the pinSanityCheck should be used only for debugging, since
-        // the synchronized block over the fileInfoMap is a hot spot.
-        if (DEBUG) {
-            pinSanityCheck(dpid);
-        }
-        CachedPage cPage = null;
-        int hash = hash(dpid);
-        CacheBucket bucket = pageMap[hash];
-        bucket.bucketLock.lock();
-        try {
-            cPage = bucket.cachedPage;
-            while (cPage != null) {
-                if (cPage.dpid == dpid) {
-                    cPage.pinCount.incrementAndGet();
-                    pageReplacementStrategy.notifyCachePageAccess(cPage);
-                    return cPage;
-                }
-                cPage = cPage.next;
-            }
-        } finally {
-            bucket.bucketLock.unlock();
-        }
-        return cPage;
-    }
-
-    @Override
     public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
         // Calling the pinSanityCheck should be used only for debugging, since
         // the synchronized block over the fileInfoMap is a hot spot.
@@ -978,7 +951,7 @@
     }
 
     @Override
-    public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
+    public void flush(ICachedPage page) throws HyracksDataException {
         // Assumes the caller has pinned the page.
         cleanerThread.cleanPage((CachedPage) page, true);
     }
@@ -1177,11 +1150,6 @@
     }
 
     @Override
-    public void adviseWontNeed(ICachedPage page) {
-        pageReplacementStrategy.adviseWontNeed((ICachedPageInternal) page);
-    }
-
-    @Override
     public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
         return confiscatePage(dpid, 1);
     }
@@ -1329,17 +1297,21 @@
                 finishQueue();
                 if (cycleCount > MAX_PIN_ATTEMPT_CYCLES) {
                     cycleCount = 0; // suppress warning below
-                    throw new HyracksDataException("Unable to find free page in buffer cache after "
-                            + MAX_PIN_ATTEMPT_CYCLES + " cycles (buffer cache undersized?)"
-                            + (DEBUG ? " ; " + (masterPinCount.get() - startingPinCount)
-                                    + " successful pins since start of cycle" : ""));
+                    throw new HyracksDataException(
+                            "Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES
+                                    + " cycles (buffer cache undersized?)" + (DEBUG
+                                            ? " ; " + (masterPinCount.get() - startingPinCount)
+                                                    + " successful pins since start of cycle"
+                                            : ""));
                 }
             }
         } finally {
             if (cycleCount > PIN_ATTEMPT_CYCLES_WARNING_THRESHOLD && LOGGER.isLoggable(Level.WARNING)) {
                 LOGGER.warning("Took " + cycleCount + " cycles to find free page in buffer cache.  (buffer cache "
-                        + "undersized?)" + (DEBUG ? " ; " + (masterPinCount.get() - startingPinCount)
-                                + " successful pins since start of cycle" : ""));
+                        + "undersized?)" + (DEBUG
+                                ? " ; " + (masterPinCount.get() - startingPinCount)
+                                        + " successful pins since start of cycle"
+                                : ""));
             }
         }
     }
@@ -1402,13 +1374,8 @@
     }
 
     @Override
-    public void setPageDiskId(ICachedPage page, long dpid) {
-        ((CachedPage) page).dpid = dpid;
-    }
-
-    @Override
     public IFIFOPageQueue createFIFOQueue() {
-        return fifoWriter.createQueue(FIFOLocalWriter.instance());
+        return fifoWriter.createQueue(FIFOLocalWriter.INSTANCE);
     }
 
     @Override
@@ -1430,10 +1397,6 @@
     }
 
     @Override
-    /**
-     * _ONLY_ call this if you absolutely, positively know this file has no dirty pages in the cache!
-     * Bypasses the normal lifecycle of a file handle and evicts all references to it immediately.
-     */
     public void purgeHandle(int fileId) throws HyracksDataException {
         synchronized (fileInfoMap) {
             BufferedFileHandle fh = fileInfoMap.get(fileId);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index 76bbd4c..bc0a04e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -46,7 +46,7 @@
     private final StackTraceElement[] ctorStack;
 
     //Constructor for making dummy entry for FIFO queue
-    public CachedPage(){
+    public CachedPage() {
         this.cpid = -1;
         this.buffer = null;
         this.pageReplacementStrategy = null;
@@ -55,7 +55,7 @@
         pinCount = null;
         queueInfo = null;
         replacementStrategyObject = null;
-        latch =null;
+        latch = null;
         ctorStack = DEBUG ? new Throwable().getStackTrace() : null;
     }
 
@@ -195,4 +195,14 @@
     void setNext(CachedPage next) {
         this.next = next;
     }
+
+    @Override
+    public void setDiskPageId(long dpid) {
+        this.dpid = dpid;
+    }
+
+    @Override
+    public boolean isLargePage() {
+        return multiplier > 1;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DebugBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DebugBufferCache.java
index 8f7a965..f3de1c1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DebugBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DebugBufferCache.java
@@ -77,11 +77,6 @@
     }
 
     @Override
-    public ICachedPage tryPin(long dpid) throws HyracksDataException {
-        return bufferCache.tryPin(dpid);
-    }
-
-    @Override
     public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
         ICachedPage page = bufferCache.pin(dpid, newPage);
         pinCount.addAndGet(1);
@@ -105,8 +100,8 @@
     }
 
     @Override
-    public int getNumPages() {
-        return bufferCache.getNumPages();
+    public int getPageBudget() {
+        return bufferCache.getPageBudget();
     }
 
     @Override
@@ -168,8 +163,8 @@
     }
 
     @Override
-    public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
-        bufferCache.flushDirtyPage(page);
+    public void flush(ICachedPage page) throws HyracksDataException {
+        bufferCache.flush(page);
     }
 
     @Override
@@ -183,11 +178,6 @@
     }
 
     @Override
-    public void adviseWontNeed(ICachedPage page) {
-        bufferCache.adviseWontNeed(page);
-    }
-
-    @Override
     public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
         return bufferCache.confiscatePage(dpid);
     }
@@ -214,12 +204,6 @@
     }
 
     @Override
-    public void setPageDiskId(ICachedPage page, long dpid) {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
     public void returnPage(ICachedPage page, boolean reinsert) {
         // TODO Auto-generated method stub
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
index 6774ddd..9d0b728 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
@@ -18,19 +18,15 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class FIFOLocalWriter implements IFIFOPageWriter {
-    private static FIFOLocalWriter instance;
+    public static final FIFOLocalWriter INSTANCE = new FIFOLocalWriter();
     private static boolean DEBUG = false;
 
-    public static FIFOLocalWriter instance() {
-        if(instance == null) {
-            instance = new FIFOLocalWriter();
-        }
-        return instance;
+    private FIFOLocalWriter() {
     }
 
     @Override
     public void write(ICachedPage page, BufferCache bufferCache) throws HyracksDataException {
-        CachedPage cPage = (CachedPage)page;
+        CachedPage cPage = (CachedPage) page;
         try {
             bufferCache.write(cPage);
         } finally {
@@ -43,6 +39,6 @@
 
     @Override
     public void sync(int fileId, BufferCache bufferCache) throws HyracksDataException {
-        bufferCache.force(fileId,true);
+        bufferCache.force(fileId, true);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java
index 789f7b7..28801ea 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java
@@ -88,50 +88,185 @@
      */
     void deleteFile(FileReference file) throws HyracksDataException;
 
-    ICachedPage tryPin(long dpid) throws HyracksDataException;
-
+    /**
+     * Pin the page so it can't be evicted from the buffer cache...
+     *
+     * @param dpid
+     *            page id is a unique id that is a combination of file id and page id
+     * @param newPage
+     *            whether this page is expected to be new.
+     *            NOTE: undefined:
+     *            -- what if the flag is true but the page exists?
+     *            -- what if the flag is false but the page doesn't exist
+     * @return the pinned page
+     * @throws HyracksDataException
+     */
     ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException;
 
+    /**
+     * Unpin a pinned page so its buffer can be recycled
+     *
+     * @param page
+     *            the page
+     * @throws HyracksDataException
+     */
     void unpin(ICachedPage page) throws HyracksDataException;
 
-    void flushDirtyPage(ICachedPage page) throws HyracksDataException;
+    /**
+     * Flush the page if it is dirty
+     *
+     * @param page
+     *            the page to flush
+     * @throws HyracksDataException
+     */
+    void flush(ICachedPage page) throws HyracksDataException;
 
-    void adviseWontNeed(ICachedPage page);
-
-    ICachedPage confiscatePage(long dpid) throws HyracksDataException;
-
-    ICachedPage confiscateLargePage(long dpid, int multiplier, int extraBlockPageId) throws HyracksDataException;
-
-    void returnPage(ICachedPage page);
-
-    void returnPage(ICachedPage page, boolean reinsert);
-
+    /**
+     * Force bits that have been already flushed to disk
+     * This method doesn't flush all dirty pages to disk but simply calls the sync method on the filesystem api
+     *
+     * @param fileId
+     *            the file id
+     * @param metadata
+     *            whether metadata should be synced as well
+     * @throws HyracksDataException
+     */
     void force(int fileId, boolean metadata) throws HyracksDataException;
 
+    /**
+     * Take a page such that no one else has access to it
+     *
+     * @param dpid
+     *            the unique (fileId,pageId)
+     * @return the confiscated page or null if no page is available
+     * @throws HyracksDataException
+     */
+    ICachedPage confiscatePage(long dpid) throws HyracksDataException;
+
+    /**
+     *
+     * @return the confiscated page or null if no page is available
+     * @throws HyracksDataException
+     */
+    /**
+     * Take a large page such that no one else has access to it
+     *
+     * @param dpid
+     *            the unique (fileId,pageId)
+     * @param multiplier
+     *            how many multiples of the original page size
+     * @param extraBlockPageId
+     *            the page id where the large block comes from
+     * @return
+     *         the confiscated page or null if a large page couldn't be found
+     * @throws HyracksDataException
+     */
+    ICachedPage confiscateLargePage(long dpid, int multiplier, int extraBlockPageId) throws HyracksDataException;
+
+    /**
+     * Return and re-insert a confiscated page
+     *
+     * @param page
+     *            the confiscated page
+     */
+    void returnPage(ICachedPage page);
+
+    /**
+     * Return a confiscated page
+     *
+     * @param page
+     *            the confiscated page
+     * @param reinsert
+     *            if true, return the page to the cache, otherwise, destroy
+     */
+    void returnPage(ICachedPage page, boolean reinsert);
+
+    /**
+     * Get the standard page size
+     *
+     * @return the size in bytes
+     */
     int getPageSize();
 
+    /**
+     * Get the standard page size with header if any
+     *
+     * @return the sum of page size and header size in bytes
+     */
     int getPageSizeWithHeader();
 
-    int getNumPages();
+    /**
+     * @return the maximum allowed pages in this buffer cahce
+     */
+    int getPageBudget();
 
+    /**
+     * Get the number of pages used for a file
+     *
+     * @param fileId
+     *            the file id
+     * @return the number of pages used for the file
+     * @throws HyracksDataException
+     */
     int getNumPagesOfFile(int fileId) throws HyracksDataException;
 
+    /**
+     * Get the reference count for a file (num of open - num of close)
+     *
+     * @param fileId
+     *            the file
+     * @return the reference count
+     */
     int getFileReferenceCount(int fileId);
 
+    /**
+     * Close the buffer cache, all of its files, and release the memory taken by it
+     * The buffer cache is open upon successful instantiation and can't be re-opened
+     *
+     * @throws HyracksDataException
+     */
     void close() throws HyracksDataException;
 
+    /**
+     * @return an instance of {@link IFIFOPageQueue} that can be used to write pages to the file
+     */
     IFIFOPageQueue createFIFOQueue();
 
+    /**
+     * Flush the queued pages written through buffer cache FIFO queues
+     */
     void finishQueue();
 
-    void setPageDiskId(ICachedPage page, long dpid);
-
+    // TODO: remove the replication out of the buffer cache interface
+    /**
+     * @return true if replication is enabled, false otherwise
+     */
     boolean isReplicationEnabled();
 
+    /**
+     * @return the io replication manager
+     */
     IIOReplicationManager getIOReplicationManager();
 
+    /**
+     * Deletes the file and recycle all of its pages without flushing them.
+     *
+     * ONLY call this if you absolutely, positively know this file has no dirty pages in the cache!
+     * Bypasses the normal lifecycle of a file handle and evicts all references to it immediately.
+     */
     void purgeHandle(int fileId) throws HyracksDataException;
 
+    /**
+     * Resize the page
+     *
+     * @param page
+     *            the page to resize
+     * @param multiplier
+     *            how many multiples of the original page size
+     * @param extraPageBlockHelper
+     *            helper to determine the location of the resize block
+     * @throws HyracksDataException
+     */
     void resizePage(ICachedPage page, int multiplier, IExtraPageBlockHelper extraPageBlockHelper)
             throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java
index abbe233..16837b9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java
@@ -41,4 +41,13 @@
     int getPageSize();
 
     int getFrameSizeMultiplier();
+
+    void setDiskPageId(long dpid);
+
+    /**
+     * Check if a page is a large page
+     *
+     * @return true if the page is large, false otherwise
+     */
+    boolean isLargePage();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IExtraPageBlockHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IExtraPageBlockHelper.java
index ad7f2f6..607385a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IExtraPageBlockHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IExtraPageBlockHelper.java
@@ -21,6 +21,25 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IExtraPageBlockHelper {
+    /**
+     * Get the page id of the free block of size
+     *
+     * @param size
+     *            the size of the block
+     * @return
+     *         the page id
+     * @throws HyracksDataException
+     */
     int getFreeBlock(int size) throws HyracksDataException;
+
+    /**
+     * Release the block at location blockPageId which has size size
+     *
+     * @param blockPageId
+     *            the block page id
+     * @param size
+     *            the size of the block
+     * @throws HyracksDataException
+     */
     void returnFreePageBlock(int blockPageId, int size) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java
index 0fe5767..6c03671 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java
@@ -17,6 +17,7 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
+@FunctionalInterface
 public interface IFIFOPageQueue {
-    public void put(ICachedPage page) throws HyracksDataException;
+    void put(ICachedPage page) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java
index cfca77a..139a3c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java
@@ -121,4 +121,14 @@
         this.buffer = buffer;
     }
 
+    @Override
+    public void setDiskPageId(long dpid) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isLargePage() {
+        return multiplier > 1;
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/VirtualBufferCacheTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/VirtualBufferCacheTest.java
index 5ff5a11..0e749fc 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/VirtualBufferCacheTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/VirtualBufferCacheTest.java
@@ -25,49 +25,139 @@
 import java.util.Random;
 import java.util.Set;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
 import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator;
 import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IExtraPageBlockHelper;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class VirtualBufferCacheTest {
-    private static final long SEED = 123456789L;
-    private static final int NUM_OVERPIN = 128;
-    private static final int PAGE_SIZE = 256;
-    private static final int NUM_FILES = 10;
-    private static final int NUM_PAGES = 1000;
+    /*
+     * Missing tests:
+     * 0. concurrent pinnings for a single file from multiple threads
+     * 1. concurrent create file
+     * 2. file deletes while pages are pinned? Note that currently, the vbc doesn't keep track of number of pinnings
+     */
+    private static class TestExtraPageBlockHelper implements IExtraPageBlockHelper {
+        private final int fileId;
+        private int pinCount;
+        private Set<ICachedPage> pinnedPages;
+        private int totalNumPages;
 
-    private final Random random;
-    private final FileState[] fileStates;
-
-    private VirtualBufferCache vbc;
-    private IOManager ioManager;
-
-    public VirtualBufferCacheTest() {
-        fileStates = new FileState[NUM_FILES];
-        for (int i = 0; i < NUM_FILES; i++) {
-            fileStates[i] = new FileState();
+        public TestExtraPageBlockHelper(int fileId) {
+            this.fileId = fileId;
+            pinCount = 0;
+            pinnedPages = new HashSet<>();
         }
-        random = new Random(SEED);
-        vbc = null;
+
+        @Override
+        public int getFreeBlock(int size) throws HyracksDataException {
+            int before = totalNumPages;
+            totalNumPages += size - 1;
+            return before;
+        }
+
+        @Override
+        public void returnFreePageBlock(int blockPageId, int size) throws HyracksDataException {
+            // Do nothing. we don't reclaim large pages from file in this test
+        }
+
+        public void pin(VirtualBufferCache vbc, int multiplier) throws HyracksDataException {
+            ICachedPage p = vbc.pin(BufferedFileHandle.getDiskPageId(fileId, pinCount), true);
+            pinnedPages.add(p);
+            pinCount++;
+            totalNumPages++;
+            if (multiplier > 1) {
+                vbc.resizePage(p, multiplier, this);
+            }
+        }
+
     }
 
     private static class FileState {
-        private int fileId;
+        private final VirtualBufferCache vbc;
+        private final int fileId;
+        private final TestExtraPageBlockHelper helper;
         private FileReference fileRef;
-        private int pinCount;
-        private Set<ICachedPage> pinnedPages;
 
-        public FileState() {
-            fileId = -1;
-            fileRef = null;
-            pinCount = 0;
-            pinnedPages = new HashSet<>();
+        public FileState(VirtualBufferCache vbc, String fileName) throws HyracksDataException {
+            this.vbc = vbc;
+            IOManager ioManager = TestStorageManagerComponentHolder.getIOManager();
+            fileRef = ioManager.resolve(fileName);
+            vbc.createFile(fileRef);
+            fileId = vbc.getFileMapProvider().lookupFileId(fileRef);
+            helper = new TestExtraPageBlockHelper(fileId);
+        }
+
+        public void pin(int multiplier) throws HyracksDataException {
+            helper.pin(vbc, multiplier);
+        }
+    }
+
+    private static class Request {
+        private enum Type {
+            PIN_PAGE,
+            CALLBACK
+        }
+
+        private final Type type;
+        private boolean done;
+
+        public Request(Type type) {
+            this.type = type;
+            done = false;
+        }
+
+        Type getType() {
+            return type;
+        }
+
+        synchronized void complete() {
+            done = true;
+            notifyAll();
+        }
+
+        synchronized void await() throws InterruptedException {
+            while (!done) {
+                wait();
+            }
+        }
+    }
+
+    public class User extends SingleThreadEventProcessor<Request> {
+        private final VirtualBufferCache vbc;
+        private final FileState fileState;
+
+        public User(String name, VirtualBufferCache vbc, FileState fileState) throws HyracksDataException {
+            super(name);
+            this.vbc = vbc;
+            this.fileState = fileState;
+        }
+
+        @Override
+        protected void handle(Request req) throws Exception {
+            try {
+                switch (req.getType()) {
+                    case PIN_PAGE:
+                        ICachedPage p = vbc.pin(
+                                BufferedFileHandle.getDiskPageId(fileState.fileId, fileState.helper.pinCount), true);
+                        fileState.helper.pinnedPages.add(p);
+                        ++fileState.helper.pinCount;
+                        break;
+                    default:
+                        break;
+                }
+            } finally {
+                req.complete();
+            }
         }
     }
 
@@ -79,60 +169,184 @@
      * of pages.
      */
     @Test
-    public void test01() throws Exception {
-        TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES, NUM_FILES);
-        ioManager = TestStorageManagerComponentHolder.getIOManager();
+    public void testDisjointPins() throws Exception {
+        final int numOverpin = 128;
+        final int pageSize = 256;
+        final int numFiles = 10;
+        final int numPages = 1000;
+        Random random = new Random();
         ICacheMemoryAllocator allocator = new HeapBufferAllocator();
-        vbc = new VirtualBufferCache(allocator, PAGE_SIZE, NUM_PAGES);
+        VirtualBufferCache vbc = new VirtualBufferCache(allocator, pageSize, numPages);
         vbc.open();
-        createFiles();
+        FileState[] fileStates = new FileState[numFiles];
+        for (int i = 0; i < numFiles; i++) {
+            fileStates[i] = new FileState(vbc, String.format("f%d", i));
+        }
 
-        kPins(NUM_PAGES);
-        assertTrue(pagesDisjointed());
+        kPins(numPages, numFiles, fileStates, vbc, random);
+        assertTrue(pagesDisjointed(numFiles, fileStates));
 
-        kPins(NUM_OVERPIN);
-        assertTrue(pagesDisjointed());
+        kPins(numOverpin, numFiles, fileStates, vbc, random);
+        assertTrue(pagesDisjointed(numFiles, fileStates));
 
-        deleteFiles();
+        deleteFilesAndCheckMemory(numFiles, fileStates, vbc);
         vbc.close();
     }
 
-    private boolean pagesDisjointed() {
+    @Test
+    public void testConcurrentUsersDifferentFiles() throws Exception {
+        final int numOverpin = 128;
+        final int pageSize = 256;
+        final int numFiles = 10;
+        final int numPages = 1000;
+        Random random = new Random();
+        ICacheMemoryAllocator allocator = new HeapBufferAllocator();
+        VirtualBufferCache vbc = new VirtualBufferCache(allocator, pageSize, numPages);
+        vbc.open();
+        FileState[] fileStates = new FileState[numFiles];
+        User[] users = new User[numFiles];
+        for (int i = 0; i < numFiles; i++) {
+            fileStates[i] = new FileState(vbc, String.format("f%d", i));
+            users[i] = new User("User-" + i, vbc, fileStates[i]);
+        }
+        for (int i = 0; i < numPages; i++) {
+            int fsIdx = random.nextInt(numFiles);
+            users[fsIdx].add(new Request(Request.Type.PIN_PAGE));
+        }
+        // ensure all are done
+        wait(users);
+        assertTrue(pagesDisjointed(numFiles, fileStates));
+        for (int i = 0; i < numOverpin; i++) {
+            int fsIdx = random.nextInt(numFiles);
+            users[fsIdx].add(new Request(Request.Type.PIN_PAGE));
+        }
+        // ensure all are done
+        wait(users);
+        assertTrue(pagesDisjointed(numFiles, fileStates));
+        // shutdown users
+        shutdown(users);
+        deleteFilesAndCheckMemory(numFiles, fileStates, vbc);
+        vbc.close();
+    }
+
+    private void shutdown(User[] users) throws HyracksDataException, InterruptedException {
+        for (int i = 0; i < users.length; i++) {
+            users[i].stop();
+        }
+    }
+
+    private void wait(User[] users) throws InterruptedException {
+        for (int i = 0; i < users.length; i++) {
+            Request callback = new Request(Request.Type.CALLBACK);
+            users[i].add(callback);
+            callback.await();
+        }
+    }
+
+    @Test
+    public void testLargePages() throws Exception {
+        final int pageSize = 256;
+        final int numFiles = 3;
+        final int numPages = 1000;
+        ICacheMemoryAllocator allocator = new HeapBufferAllocator();
+        VirtualBufferCache vbc = new VirtualBufferCache(allocator, pageSize, numPages);
+        vbc.open();
+        FileState[] fileStates = new FileState[numFiles];
+        for (int i = 0; i < numFiles; i++) {
+            fileStates[i] = new FileState(vbc, String.format("f%d", i));
+        }
+        // Get a large page that is 52 pages size
+        int fileIdx = 0;
+        FileState f = fileStates[fileIdx];
+        f.pin(52);
+        // Assert that 52 pages are accounted for
+        Assert.assertEquals(52, vbc.getUsage());
+        // Delete file
+        vbc.deleteFile(f.fileId);
+        // Assert that usage fell down to 0
+        Assert.assertEquals(0, vbc.getUsage());
+        // Assert that no pages are pre-allocated
+        Assert.assertEquals(0, vbc.getPreAllocatedPages());
+        // Next file
+        fileIdx++;
+        f = fileStates[fileIdx];
+        // Pin small pages to capacity
+        int count = 0;
+        while (vbc.getUsage() <= vbc.getPageBudget()) {
+            f.pin(1);
+            count++;
+            Assert.assertEquals(count, vbc.getUsage());
+        }
+        // Delete file
+        vbc.deleteFile(f.fileRef);
+        // Assert that usage fell down to 0
+        Assert.assertEquals(0, vbc.getUsage());
+        // Assert that small pages are available
+        Assert.assertEquals(vbc.getPreAllocatedPages(), vbc.getPageBudget());
+        // Next file
+        fileIdx++;
+        f = fileStates[fileIdx];
+        count = 0;
+        int sizeOfLargePage = 4;
+        while (vbc.getUsage() <= vbc.getPageBudget()) {
+            f.pin(sizeOfLargePage);
+            count += sizeOfLargePage;
+            Assert.assertEquals(count, vbc.getUsage());
+            Assert.assertEquals(Integer.max(0, vbc.getPageBudget() - count), vbc.getPreAllocatedPages());
+        }
+        // Delete file
+        vbc.deleteFile(f.fileId);
+        // Assert that usage fell down to 0
+        Assert.assertEquals(0, vbc.getUsage());
+        // Assert that no pages are pre-allocated
+        Assert.assertEquals(0, vbc.getPreAllocatedPages());
+        vbc.close();
+    }
+
+    private boolean pagesDisjointed(int numFiles, FileState[] fileStates) {
         boolean disjoint = true;
-        for (int i = 0; i < NUM_FILES; i++) {
+        for (int i = 0; i < numFiles; i++) {
             FileState fi = fileStates[i];
-            for (int j = i + 1; j < NUM_FILES; j++) {
+            for (int j = i + 1; j < numFiles; j++) {
                 FileState fj = fileStates[j];
-                disjoint = disjoint && Collections.disjoint(fi.pinnedPages, fj.pinnedPages);
+                disjoint = disjoint && Collections.disjoint(fi.helper.pinnedPages, fj.helper.pinnedPages);
             }
         }
         return disjoint;
     }
 
-    private void createFiles() throws Exception {
-        for (int i = 0; i < NUM_FILES; i++) {
-            FileState f = fileStates[i];
-            String fName = String.format("f%d", i);
-            f.fileRef = ioManager.resolve(fName);
-            vbc.createFile(f.fileRef);
-            f.fileId = vbc.getFileMapProvider().lookupFileId(f.fileRef);
+    private void deleteFilesAndCheckMemory(int numFiles, FileState[] fileStates, VirtualBufferCache vbc)
+            throws Exception {
+        // Get the size of the buffer cache
+        int totalInStates = 0;
+        for (int i = 0; i < numFiles; i++) {
+            totalInStates += fileStates[i].helper.pinnedPages.size();
         }
-    }
-
-    private void deleteFiles() throws Exception {
-        for (int i = 0; i < NUM_FILES; i++) {
+        Assert.assertEquals(totalInStates, vbc.getUsage());
+        int totalFree = 0;
+        Assert.assertEquals(totalFree, vbc.getPreAllocatedPages());
+        boolean hasLargePages = vbc.getLargePages() > 0;
+        for (int i = 0; i < numFiles; i++) {
+            int expectedToBeReclaimed = 0;
+            for (ICachedPage page : fileStates[i].helper.pinnedPages) {
+                expectedToBeReclaimed += page.getFrameSizeMultiplier();
+            }
             vbc.deleteFile(fileStates[i].fileId);
+            totalFree += expectedToBeReclaimed;
+            Assert.assertEquals(totalInStates - totalFree, vbc.getUsage());
+            if (!hasLargePages) {
+                Assert.assertEquals(Integer.max(0, vbc.getPageBudget() - vbc.getUsage()), vbc.getPreAllocatedPages());
+            }
         }
     }
 
-    private void kPins(int k) throws Exception {
+    private void kPins(int k, int numFiles, FileState[] fileStates, VirtualBufferCache vbc, Random random)
+            throws Exception {
         int numPinned = 0;
         while (numPinned < k) {
-            int fsIdx = random.nextInt(NUM_FILES);
+            int fsIdx = random.nextInt(numFiles);
             FileState f = fileStates[fsIdx];
-            ICachedPage p = vbc.pin(BufferedFileHandle.getDiskPageId(f.fileId, f.pinCount), true);
-            f.pinnedPages.add(p);
-            ++f.pinCount;
+            f.pin(1);
             ++numPinned;
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
index e34ee0e..26ad457 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
@@ -78,10 +78,6 @@
 
         ICachedPage page = null;
 
-        // tryPin should fail
-        page = bufferCache.tryPin(BufferedFileHandle.getDiskPageId(fileId, testPageId));
-        Assert.assertNull(page);
-
         // pin page should succeed
         page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, testPageId), true);
         page.acquireWriteLatch();
@@ -89,12 +85,6 @@
             for (int i = 0; i < num; i++) {
                 page.getBuffer().putInt(i * 4, i);
             }
-
-            // try pin should succeed
-            ICachedPage page2 = bufferCache.tryPin(BufferedFileHandle.getDiskPageId(fileId, testPageId));
-            Assert.assertNotNull(page2);
-            bufferCache.unpin(page2);
-
         } finally {
             page.releaseWriteLatch(true);
             bufferCache.unpin(page);
@@ -102,31 +92,11 @@
 
         bufferCache.closeFile(fileId);
 
-        // This code is commented because the method pinSanityCheck in the BufferCache is commented.
-        /*boolean exceptionThrown = false;
-
-        // tryPin should fail since file is not open
-        try {
-            page = bufferCache.tryPin(BufferedFileHandle.getDiskPageId(fileId, testPageId));
-        } catch (HyracksDataException e) {
-            exceptionThrown = true;
-        }
-        Assert.assertTrue(exceptionThrown);
-
-        // pin should fail since file is not open
-        exceptionThrown = false;
-        try {
-            page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, testPageId), false);
-        } catch (HyracksDataException e) {
-            exceptionThrown = true;
-        }
-        Assert.assertTrue(exceptionThrown);*/
-
         // open file again
         bufferCache.openFile(fileId);
 
         // tryPin should succeed because page should still be cached
-        page = bufferCache.tryPin(BufferedFileHandle.getDiskPageId(fileId, testPageId));
+        page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, testPageId), false);
         Assert.assertNotNull(page);
         page.acquireReadLatch();
         try {