Merge branch 'master' into raman/master_feeds
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
index 1bd22d7..5ac2961 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
@@ -19,6 +19,8 @@
 import java.io.File;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -83,7 +85,11 @@
         TestStorageManagerComponentHolder.init(8192, 20, 20);
     }
 
-    protected static final int MERGE_THRESHOLD = 3;
+    protected static final Map<String, String> MERGE_POLICY_PROPERTIES;
+    static {
+        MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+        MERGE_POLICY_PROPERTIES.put("num-components", "3");
+    }
 
     protected IVirtualBufferCacheProvider virtualBufferCacheProvider = new TestVirtualBufferCacheProvider(
             DEFAULT_MEM_PAGE_SIZE, DEFAULT_MEM_NUM_PAGES);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
index fc932a3..8addae7 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
@@ -26,7 +26,7 @@
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -52,7 +52,7 @@
                 PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
 
         invertedIndexDataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
-                virtualBufferCacheProvider, new ConstantMergePolicyProvider(MERGE_THRESHOLD),
+                virtualBufferCacheProvider, new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES,
                 ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
                 NoOpIOOperationCallback.INSTANCE, DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
     }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
index 75045b4..82de0c9 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
@@ -24,7 +24,7 @@
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -47,9 +47,9 @@
                 PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
 
         invertedIndexDataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(virtualBufferCacheProvider,
-                new ConstantMergePolicyProvider(MERGE_THRESHOLD), ThreadCountingOperationTrackerProvider.INSTANCE,
-                SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE,
-                DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
+                new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES,
+                ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
+                NoOpIOOperationCallback.INSTANCE, DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
     }
 
     @Override
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
index e98ecde..a4fd4c1 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
@@ -15,10 +15,13 @@
 
 package edu.uci.ics.hyracks.tests.am.lsm.btree;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -26,15 +29,19 @@
 
 public class LSMBTreeOperatorTestHelper extends LSMTreeOperatorTestHelper {
 
-    private static final int MERGE_THRESHOLD = 3;
-
+    private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+    static {
+        MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+        MERGE_POLICY_PROPERTIES.put("num-components", "3");
+    }
+    
     public LSMBTreeOperatorTestHelper(IOManager ioManager) {
         super(ioManager);
     }
 
     public IIndexDataflowHelperFactory createDataFlowHelperFactory() {
-        return new LSMBTreeDataflowHelperFactory(virtualBufferCacheProvider, new ConstantMergePolicyProvider(
-                MERGE_THRESHOLD), ThreadCountingOperationTrackerProvider.INSTANCE,
+        return new LSMBTreeDataflowHelperFactory(virtualBufferCacheProvider, new ConstantMergePolicyFactory(),
+                MERGE_POLICY_PROPERTIES, ThreadCountingOperationTrackerProvider.INSTANCE,
                 SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE,
                 DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
     }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
index e7478d6..79b8eb7 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeOperatorTestHelper.java
@@ -15,12 +15,15 @@
 
 package edu.uci.ics.hyracks.tests.am.lsm.rtree;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -30,7 +33,11 @@
 
 public class LSMRTreeOperatorTestHelper extends LSMTreeOperatorTestHelper {
 
-    private static final int MERGE_THRESHOLD = 3;
+    private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+    static {
+        MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+        MERGE_POLICY_PROPERTIES.put("num-components", "3");
+    }
 
     public LSMRTreeOperatorTestHelper(IOManager ioManager) {
         super(ioManager);
@@ -40,7 +47,7 @@
             IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
             IBinaryComparatorFactory[] btreeComparatorFactories, ILinearizeComparatorFactory linearizerCmpFactory) {
         return new LSMRTreeDataflowHelperFactory(valueProviderFactories, rtreePolicyType, btreeComparatorFactories,
-                virtualBufferCacheProvider, new ConstantMergePolicyProvider(MERGE_THRESHOLD),
+                virtualBufferCacheProvider, new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES,
                 ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
                 NoOpIOOperationCallback.INSTANCE, linearizerCmpFactory, DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE);
     }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
index f883d90..fd6171a 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesOperatorTestHelper.java
@@ -15,12 +15,15 @@
 
 package edu.uci.ics.hyracks.tests.am.lsm.rtree;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerProvider;
@@ -30,7 +33,11 @@
 
 public class LSMRTreeWithAntiMatterTuplesOperatorTestHelper extends LSMTreeOperatorTestHelper {
 
-    private static final int MERGE_THRESHOLD = 3;
+    private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+    static {
+        MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+        MERGE_POLICY_PROPERTIES.put("num-components", "3");
+    }
 
     public LSMRTreeWithAntiMatterTuplesOperatorTestHelper(IOManager ioManager) {
         super(ioManager);
@@ -40,9 +47,8 @@
             IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType,
             IBinaryComparatorFactory[] btreeComparatorFactories, ILinearizeComparatorFactory linearizerCmpFactory) {
         return new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(valueProviderFactories, rtreePolicyType,
-                btreeComparatorFactories, virtualBufferCacheProvider, new ConstantMergePolicyProvider(MERGE_THRESHOLD),
-                ThreadCountingOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
-                NoOpIOOperationCallback.INSTANCE, linearizerCmpFactory);
+                btreeComparatorFactories, virtualBufferCacheProvider, new ConstantMergePolicyFactory(),
+                MERGE_POLICY_PROPERTIES, ThreadCountingOperationTrackerProvider.INSTANCE,
+                SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE, linearizerCmpFactory);
     }
-
 }
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
index 91207e8..b5c0a1d 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
@@ -25,5 +25,6 @@
     PHYSICALDELETE,
     NOOP,
     MERGE,
+    FULL_MERGE,
     FLUSH
 }
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
index 104a70d..0fdef13 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelperFactory.java
@@ -15,12 +15,14 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow;
 
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -30,11 +32,11 @@
     private static final long serialVersionUID = 1L;
 
     public LSMBTreeDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
-            ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerFactory,
-            ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
-            double bloomFilterFalsePositiveRate) {
-        super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerFactory, ioSchedulerProvider,
-                ioOpCallbackFactory, bloomFilterFalsePositiveRate);
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+            ILSMOperationTrackerProvider opTrackerFactory, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+            ILSMIOOperationCallbackFactory ioOpCallbackFactory, double bloomFilterFalsePositiveRate) {
+        super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerFactory,
+                ioSchedulerProvider, ioOpCallbackFactory, bloomFilterFalsePositiveRate);
     }
 
     @Override
@@ -42,7 +44,7 @@
             int partition) {
         return new LSMBTreeDataflowHelper(opDesc, ctx, partition,
                 virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
-                mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
-                ioOpCallbackFactory);
+                mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+                ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory);
     }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 33ae7bb..3e51e20 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -242,10 +242,10 @@
     public void getOperationalComponents(ILSMIndexOperationContext ctx) {
         List<ILSMComponent> immutableComponents = diskComponents;
         List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
-        operationalComponents.clear();
         int cmc = currentMutableComponentId.get();
         ctx.setCurrentMutableComponentId(cmc);
         int numMutableComponents = memoryComponents.size();
+        operationalComponents.clear();
         switch (ctx.getOperation()) {
             case UPDATE:
             case UPSERT:
@@ -256,6 +256,7 @@
                 break;
             case SEARCH:
             case INSERT:
+
                 for (int i = 0; i < numMutableComponents - 1; i++) {
                     ILSMComponent c = memoryComponents.get((cmc + i + 1) % numMutableComponents);
                     LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) c;
@@ -269,6 +270,9 @@
                 operationalComponents.addAll(immutableComponents);
                 break;
             case MERGE:
+                operationalComponents.addAll(ctx.getComponentsToBeMerged());
+                break;
+            case FULL_MERGE:
                 operationalComponents.addAll(immutableComponents);
                 break;
             default:
@@ -423,8 +427,12 @@
         LSMBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
         opCtx.setOperation(IndexOperation.MERGE);
         List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
-        ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx);
-
+        boolean returnDeletedTuples = false;
+        if (ctx.getComponentHolder().get(ctx.getComponentHolder().size() - 1) != diskComponents.get(diskComponents
+                .size() - 1)) {
+            returnDeletedTuples = true;
+        }
+        ITreeIndexCursor cursor = new LSMBTreeRangeSearchCursor(opCtx, returnDeletedTuples);
         BTree firstBTree = (BTree) ((LSMBTreeDiskComponent) mergingComponents.get(0)).getBTree();
         BTree lastBTree = (BTree) ((LSMBTreeDiskComponent) mergingComponents.get(mergingComponents.size() - 1))
                 .getBTree();
@@ -455,8 +463,8 @@
         int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
         BloomFilterSpecification bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement,
                 bloomFilterFalsePositiveRate);
-        LSMBTreeDiskComponent mergedComponent = createDiskComponent(componentFactory,
-                mergeOp.getBTreeMergeTarget(), mergeOp.getBloomFilterMergeTarget(), true);
+        LSMBTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getBTreeMergeTarget(),
+                mergeOp.getBloomFilterMergeTarget(), true);
 
         IIndexBulkLoader bulkLoader = mergedComponent.getBTree().createBulkLoader(1.0f, false, numElements, false);
         IIndexBulkLoader builder = mergedComponent.getBloomFilter().createBuilder(numElements,
@@ -476,9 +484,8 @@
         return mergedComponent;
     }
 
-    private LSMBTreeDiskComponent createDiskComponent(LSMBTreeDiskComponentFactory factory,
-            FileReference btreeFileRef, FileReference bloomFilterFileRef, boolean createComponent)
-            throws HyracksDataException, IndexException {
+    private LSMBTreeDiskComponent createDiskComponent(LSMBTreeDiskComponentFactory factory, FileReference btreeFileRef,
+            FileReference bloomFilterFileRef, boolean createComponent) throws HyracksDataException, IndexException {
         // Create new BTree instance.
         LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) factory
                 .createLSMComponentInstance(new LSMComponentFileReferences(btreeFileRef, null, bloomFilterFileRef));
@@ -541,8 +548,8 @@
             } catch (HyracksDataException | IndexException e) {
                 throw new TreeIndexException(e);
             }
-            bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(
-                    fillFactor, verifyInput, numElementsHint, false);
+            bulkLoader = (BTreeBulkLoader) ((LSMBTreeDiskComponent) component).getBTree().createBulkLoader(fillFactor,
+                    verifyInput, numElementsHint, false);
 
             int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
             BloomFilterSpecification bloomFilterSpec = BloomCalculations.computeBloomSpec(maxBucketsPerElement,
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index 0b2d7cf..381b012 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -43,4 +43,9 @@
     public BloomFilter getBloomFilter() {
         return bloomFilter;
     }
+
+    @Override
+    public long getComponentSize() {
+        return btree.getFileReference().getFile().length() + bloomFilter.getFileReference().getFile().length();
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index 6d2d7c0..cb7bae7 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@ -48,6 +48,7 @@
     public final IModificationOperationCallback modificationCallback;
     public final ISearchOperationCallback searchCallback;
     private final List<ILSMComponent> componentHolder;
+    private final List<ILSMComponent> componentsToBeMerged;
 
     public LSMBTreeOpContext(List<ILSMComponent> mutableComponents, ITreeIndexFrameFactory insertLeafFrameFactory,
             ITreeIndexFrameFactory deleteLeafFrameFactory, IModificationOperationCallback modificationCallback,
@@ -84,6 +85,7 @@
             deleteLeafFrame.setMultiComparator(cmp);
         }
         this.componentHolder = new LinkedList<ILSMComponent>();
+        this.componentsToBeMerged = new LinkedList<ILSMComponent>();
         this.modificationCallback = modificationCallback;
         this.searchCallback = searchCallback;
     }
@@ -107,6 +109,7 @@
     @Override
     public void reset() {
         componentHolder.clear();
+        componentsToBeMerged.clear();
     }
 
     public IndexOperation getOperation() {
@@ -153,4 +156,9 @@
                 break;
         }
     }
+
+    @Override
+    public List<ILSMComponent> getComponentsToBeMerged() {
+        return componentsToBeMerged;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 6eada4b..668a12c 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -49,7 +49,11 @@
     private boolean proceed = true;
 
     public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx) {
-        super(opCtx);
+        this(opCtx, false);
+    }
+
+    public LSMBTreeRangeSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
+        super(opCtx, returnDeletedTuples);
         this.copyTuple = new ArrayTupleReference();
         this.reusablePred = new RangePredicate(null, null, true, true, null, null);
     }
@@ -126,7 +130,7 @@
                 }
                 // If there is no previous tuple or the previous tuple can be ignored
                 if (outputElement == null) {
-                    if (isDeleted(checkElement)) {
+                    if (isDeleted(checkElement) && !returnDeletedTuples) {
                         // If the key has been deleted then pop it and set needPush to true.
                         // We cannot push immediately because the tuple may be
                         // modified if hasNext() is called
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index bc7cbf7..1903998 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -37,6 +37,9 @@
     public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException, IndexException;
 
+    public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+            throws HyracksDataException, IndexException;
+
     public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
             IndexException;
 
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index 3405b60..36a2ca1 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.common.api;
 
+import java.util.List;
+
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
@@ -30,7 +32,10 @@
 public interface ILSMIndexAccessor extends IIndexAccessor {
     public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException;
 
-    public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException;
+    public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
+            throws HyracksDataException, IndexException;
+
+    public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException;
 
     /**
      * Deletes the tuple from the memory component only.
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
index fcd4037..80264bc 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
@@ -22,6 +22,8 @@
 
 public interface ILSMIndexOperationContext extends IIndexOperationContext {
     public List<ILSMComponent> getComponentHolder();
+    
+    public List<ILSMComponent> getComponentsToBeMerged();
 
     public ISearchOperationCallback getSearchOperationCallback();
 
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
index 1473071..279605f 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
@@ -15,9 +15,14 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.common.api;
 
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 
 public interface ILSMMergePolicy {
-    public void diskComponentAdded(ILSMIndex index) throws HyracksDataException, IndexException;
+    public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
+            IndexException;
+
+    public void configure(Map<String, String> properties);
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
similarity index 73%
rename from hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java
rename to hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
index cf56750..c90b5f7 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyFactory.java
@@ -15,9 +15,13 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.api;
 
 import java.io.Serializable;
+import java.util.Map;
+import java.util.Set;
 
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+public interface ILSMMergePolicyFactory extends Serializable {
+    public ILSMMergePolicy createMergePolicy(Map<String, String> configuration);
 
-public interface ILSMMergePolicyProvider extends Serializable {
-    public ILSMMergePolicy getMergePolicy(IHyracksTaskContext ctx);
-}
+    public String getName();
+
+    public Set<String> getPropertiesNames();
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java
index 2c082bb..4276ba7 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java
@@ -15,10 +15,12 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
 
+import java.util.Map;
+
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
 
@@ -26,21 +28,23 @@
     protected static final long serialVersionUID = 1L;
 
     protected final IVirtualBufferCacheProvider virtualBufferCacheProvider;
-    protected final ILSMMergePolicyProvider mergePolicyProvider;
+    protected final ILSMMergePolicyFactory mergePolicyFactory;
+    protected final Map<String, String> mergePolicyProperties;
     protected final ILSMOperationTrackerProvider opTrackerFactory;
     protected final ILSMIOOperationSchedulerProvider ioSchedulerProvider;
     protected final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
     protected final double bloomFilterFalsePositiveRate;
 
     public AbstractLSMIndexDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
-            ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerFactory,
-            ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
-            double bloomFilterFalsePositiveRate) {
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+            ILSMOperationTrackerProvider opTrackerFactory, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+            ILSMIOOperationCallbackFactory ioOpCallbackFactory, double bloomFilterFalsePositiveRate) {
         this.virtualBufferCacheProvider = virtualBufferCacheProvider;
-        this.mergePolicyProvider = mergePolicyProvider;
+        this.mergePolicyFactory = mergePolicyFactory;
         this.opTrackerFactory = opTrackerFactory;
         this.ioSchedulerProvider = ioSchedulerProvider;
         this.ioOpCallbackFactory = ioOpCallbackFactory;
         this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
+        this.mergePolicyProperties = mergePolicyProperties;
     }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
new file mode 100644
index 0000000..a61dec4
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+
+public class LSMIndexCompactOperatorNodePushable extends AbstractOperatorNodePushable {
+    private final IIndexDataflowHelper indexHelper;
+
+    public LSMIndexCompactOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition) {
+        this.indexHelper = opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(opDesc, ctx, partition);
+    }
+
+    @Override
+    public void deinitialize() throws HyracksDataException {
+        indexHelper.close();
+    }
+
+    @Override
+    public int getInputArity() {
+        return 0;
+    }
+
+    @Override
+    public IFrameWriter getInputFrameWriter(int index) {
+        return null;
+    }
+
+    @Override
+    public void initialize() throws HyracksDataException {
+        indexHelper.open();
+        ILSMIndex index = (ILSMIndex) indexHelper.getIndexInstance();
+        ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                NoOpOperationCallback.INSTANCE);
+        try {
+            accessor.scheduleFullMerge(NoOpIOOperationCallback.INSTANCE);
+        } catch (Exception e) {
+            indexHelper.close();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+    }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
new file mode 100644
index 0000000..3c40f94
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
+
+public class LSMTreeIndexCompactOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public LSMTreeIndexCompactOperatorDescriptor(IOperatorDescriptorRegistry spec,
+            IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+            IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+            IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields,
+            IIndexDataflowHelperFactory dataflowHelperFactory,
+            IModificationOperationCallbackFactory modificationOpCallbackProvider) {
+        super(spec, 0, 0, null, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, null, false,
+                NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE,
+                modificationOpCallbackProvider);
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new LSMIndexCompactOperatorNodePushable(this, ctx, partition);
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
index bc6baeb..ec12c23 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
@@ -85,4 +85,6 @@
 
     protected abstract void destroy() throws HyracksDataException;
 
+    public abstract long getComponentSize();
+
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index b6f5657..8a6b8bd 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -15,27 +15,43 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
+import java.util.List;
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 
 public class ConstantMergePolicy implements ILSMMergePolicy {
+    private int numComponents;
 
-    private final int threshold;
-
-    public ConstantMergePolicy(int threshold) {
-        this.threshold = threshold;
+    @Override
+    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
+            IndexException {
+        List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+        for (ILSMComponent c : immutableComponents) {
+            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+                return;
+            }
+        }
+        if (fullMergeIsRequested) {
+            ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                    NoOpOperationCallback.INSTANCE);
+            accessor.scheduleFullMerge(index.getIOOperationCallback());
+        } else if (immutableComponents.size() >= numComponents) {
+            ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                    NoOpOperationCallback.INSTANCE);
+            accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents);
+        }
     }
 
     @Override
-    public void diskComponentAdded(final ILSMIndex index) throws HyracksDataException, IndexException {
-        if (index.getImmutableComponents().size() >= threshold) {
-            ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
-            accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
-        }
+    public void configure(Map<String, String> properties) {
+        numComponents = Integer.parseInt(properties.get("num-components"));
     }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java
new file mode 100644
index 0000000..13f5ad9
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+public class ConstantMergePolicyFactory implements ILSMMergePolicyFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String[] SET_VALUES = new String[] { "num-components" };
+    private static final Set<String> PROPERTIES_NAMES = new HashSet<String>(Arrays.asList(SET_VALUES));
+
+    @Override
+    public ILSMMergePolicy createMergePolicy(Map<String, String> properties) {
+        ILSMMergePolicy policy = new ConstantMergePolicy();
+        policy.configure(properties);
+        return policy;
+    }
+
+    @Override
+    public String getName() {
+        return "constant";
+    }
+
+    @Override
+    public Set<String> getPropertiesNames() {
+        return PROPERTIES_NAMES;
+    }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
deleted file mode 100644
index a7383c1..0000000
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
-
-public class ConstantMergePolicyProvider implements ILSMMergePolicyProvider {
-
-    private static final long serialVersionUID = 1L;
-
-    private final int threshold;
-
-    public ConstantMergePolicyProvider(int threshold) {
-        this.threshold = threshold;
-    }
-
-    @Override
-    public ILSMMergePolicy getMergePolicy(IHyracksTaskContext ctx) {
-        return new ConstantMergePolicy(threshold);
-    }
-
-}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index ca775b7..443ad2b 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -16,6 +16,7 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -41,11 +42,13 @@
     private final ILSMIndexInternal lsmIndex;
     private final ILSMMergePolicy mergePolicy;
     private final ILSMOperationTracker opTracker;
+    private final AtomicBoolean fullMergeIsRequested;
 
     public LSMHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker) {
         this.lsmIndex = lsmIndex;
         this.opTracker = opTracker;
         this.mergePolicy = mergePolicy;
+        fullMergeIsRequested = new AtomicBoolean();
     }
 
     private boolean getAndEnterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, boolean isTryOperation)
@@ -180,13 +183,14 @@
                         // newComponent is null if the flush op. was not performed.
                         if (newComponent != null) {
                             lsmIndex.addComponent(newComponent);
-                            mergePolicy.diskComponentAdded(lsmIndex);
+                            mergePolicy.diskComponentAdded(lsmIndex, false);
                         }
                         break;
                     case MERGE:
                         // newComponent is null if the merge op. was not performed.
                         if (newComponent != null) {
                             lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder());
+                            mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
                         }
                         break;
                     default:
@@ -287,7 +291,6 @@
     @Override
     public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException, IndexException {
-        // Merge should always be a try operation, because it should never fail to enter the components unless the merge policy is erroneous.
         if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
             callback.afterFinalize(LSMOperationType.MERGE, null);
             return;
@@ -296,6 +299,20 @@
     }
 
     @Override
+    public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+            throws HyracksDataException, IndexException {
+        fullMergeIsRequested.set(true);
+        if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
+            // If the merge cannot be scheduled because there is already an ongoing merge on subset/all of the components, then
+            // whenever the current merge has finished, it will schedule the full merge again.
+            callback.afterFinalize(LSMOperationType.MERGE, null);
+            return;
+        }
+        fullMergeIsRequested.set(false);
+        lsmIndex.scheduleMerge(ctx, callback);
+    }
+
+    @Override
     public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
             IndexException {
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -320,7 +337,7 @@
     public void addBulkLoadedComponent(ILSMComponent c) throws HyracksDataException, IndexException {
         lsmIndex.markAsValid(c);
         lsmIndex.addComponent(c);
-        mergePolicy.diskComponentAdded(lsmIndex);
+        mergePolicy.diskComponentAdded(lsmIndex, false);
     }
 
     @Override
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index 45cc69b..2bc45a9 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -42,11 +42,13 @@
     protected boolean includeMutableComponent;
     protected ILSMHarness lsmHarness;
     protected final ILSMIndexOperationContext opCtx;
+    protected final boolean returnDeletedTuples;
 
     protected List<ILSMComponent> operationalComponents;
 
-    public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx) {
+    public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
         this.opCtx = opCtx;
+        this.returnDeletedTuples = returnDeletedTuples;
         outputElement = null;
         needPush = false;
     }
@@ -110,14 +112,14 @@
 
     @Override
     public void close() throws HyracksDataException {
-        if (lsmHarness != null) {
-            try {
-                outputPriorityQueue.clear();
-                for (int i = 0; i < rangeCursors.length; i++) {
-                    rangeCursors[i].close();
-                }
-                rangeCursors = null;
-            } finally {
+        try {
+            outputPriorityQueue.clear();
+            for (int i = 0; i < rangeCursors.length; i++) {
+                rangeCursors[i].close();
+            }
+            rangeCursors = null;
+        } finally {
+            if (lsmHarness != null) {
                 lsmHarness.endSearch(opCtx);
             }
         }
@@ -154,7 +156,50 @@
         return ((ILSMTreeTupleReference) checkElement.getTuple()).isAntimatter();
     }
 
-    abstract protected void checkPriorityQueue() throws HyracksDataException, IndexException;
+    protected void checkPriorityQueue() throws HyracksDataException, IndexException {
+        while (!outputPriorityQueue.isEmpty() || needPush == true) {
+            if (!outputPriorityQueue.isEmpty()) {
+                PriorityQueueElement checkElement = outputPriorityQueue.peek();
+                // If there is no previous tuple or the previous tuple can be ignored
+                if (outputElement == null) {
+                    if (isDeleted(checkElement) && !returnDeletedTuples) {
+                        // If the key has been deleted then pop it and set needPush to true.
+                        // We cannot push immediately because the tuple may be
+                        // modified if hasNext() is called
+                        outputElement = outputPriorityQueue.poll();
+                        needPush = true;
+                    } else {
+                        break;
+                    }
+                } else {
+                    // Compare the previous tuple and the head tuple in the PQ
+                    if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
+                        // If the previous tuple and the head tuple are
+                        // identical
+                        // then pop the head tuple and push the next tuple from
+                        // the tree of head tuple
+
+                        // the head element of PQ is useless now
+                        PriorityQueueElement e = outputPriorityQueue.poll();
+                        pushIntoPriorityQueue(e);
+                    } else {
+                        // If the previous tuple and the head tuple are different
+                        // the info of previous tuple is useless
+                        if (needPush == true) {
+                            pushIntoPriorityQueue(outputElement);
+                            needPush = false;
+                        }
+                        outputElement = null;
+                    }
+                }
+            } else {
+                // the priority queue is empty and needPush
+                pushIntoPriorityQueue(outputElement);
+                needPush = false;
+                outputElement = null;
+            }
+        }
+    }
 
     @Override
     public boolean exclusiveLatchNodes() {
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index f11a061..c828bd2 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -15,12 +15,15 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
+import java.util.List;
+
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -116,12 +119,21 @@
     }
 
     @Override
-    public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+    public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
+            throws HyracksDataException, IndexException {
         ctx.setOperation(IndexOperation.MERGE);
+        ctx.getComponentsToBeMerged().clear();
+        ctx.getComponentsToBeMerged().addAll(components);
         lsmHarness.scheduleMerge(ctx, callback);
     }
 
     @Override
+    public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+        ctx.setOperation(IndexOperation.FULL_MERGE);
+        lsmHarness.scheduleFullMerge(ctx, callback);
+    }
+
+    @Override
     public void forcePhysicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
         ctx.setOperation(IndexOperation.PHYSICALDELETE);
         lsmHarness.forceModify(ctx, tuple);
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
index 17d1b17..ca22268 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
+import java.util.Map;
+
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 
@@ -21,8 +23,12 @@
     INSTANCE;
 
     @Override
-    public void diskComponentAdded(ILSMIndex index) {
+    public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested) {
         // Do nothing
     }
 
+    @Override
+    public void configure(Map<String, String> properties) {
+        // Do nothing
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
new file mode 100644
index 0000000..fe04db6
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+
+public class PrefixMergePolicy implements ILSMMergePolicy {
+
+    private long maxMergableComponentSize;
+    private int maxTolernaceComponentCount;
+
+    @Override
+    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
+            IndexException {
+        // 1.  Look at the candidate components for merging in oldest-first order.  If one exists, identify the prefix of the sequence of
+        // all such components for which the sum of their sizes exceeds MaxMrgCompSz.  Schedule a merge of those components into a new component.
+        // 2.  If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt.  If so, schedule
+        // a merge all of the current candidates into a new single component.
+        List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+        for (ILSMComponent c : immutableComponents) {
+            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+                return;
+            }
+        }
+        if (fullMergeIsRequested) {
+            ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                    NoOpOperationCallback.INSTANCE);
+            accessor.scheduleFullMerge(index.getIOOperationCallback());
+            return;
+        }
+        long totalSize = 0;
+        int startIndex = -1;
+        for (int i = 0; i < immutableComponents.size(); i++) {
+            ILSMComponent c = immutableComponents.get(i);
+            long componentSize = ((AbstractDiskLSMComponent) c).getComponentSize();
+            if (componentSize > maxMergableComponentSize) {
+                startIndex = i;
+                totalSize = 0;
+                continue;
+            }
+            totalSize += componentSize;
+            boolean isLastComponent = i + 1 == immutableComponents.size() ? true : false;
+            if (totalSize > maxMergableComponentSize
+                    || (isLastComponent && i - startIndex >= maxTolernaceComponentCount)) {
+                List<ILSMComponent> mergableCopments = new ArrayList<ILSMComponent>();
+                for (int j = startIndex + 1; j <= i; j++) {
+                    mergableCopments.add(immutableComponents.get(j));
+                }
+                ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                        NoOpOperationCallback.INSTANCE);
+                accessor.scheduleMerge(index.getIOOperationCallback(), mergableCopments);
+                break;
+            }
+        }
+    }
+
+    @Override
+    public void configure(Map<String, String> properties) {
+        maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size"));
+        maxTolernaceComponentCount = Integer.parseInt(properties.get("max-tolernace-component-count"));
+    }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java
new file mode 100644
index 0000000..981ec6c
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/PrefixMergePolicyFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+public class PrefixMergePolicyFactory implements ILSMMergePolicyFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String[] SET_VALUES = new String[] { "max-mergable-component-size",
+            "max-tolernace-component-count" };
+    private static final Set<String> PROPERTIES_NAMES = new HashSet<String>(Arrays.asList(SET_VALUES));
+
+    @Override
+    public ILSMMergePolicy createMergePolicy(Map<String, String> properties) {
+        ILSMMergePolicy policy = new PrefixMergePolicy();
+        policy.configure(properties);
+        return policy;
+    }
+
+    @Override
+    public String getName() {
+        return "prefix";
+    }
+
+    @Override
+    public Set<String> getPropertiesNames() {
+        return PROPERTIES_NAMES;
+    }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexCompactOperator.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexCompactOperator.java
new file mode 100644
index 0000000..22e7505
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexCompactOperator.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMIndexCompactOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
+
+public class LSMInvertedIndexCompactOperator extends AbstractLSMInvertedIndexOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public LSMInvertedIndexCompactOperator(IOperatorDescriptorRegistry spec, IStorageManagerInterface storageManager,
+            IFileSplitProvider fileSplitProvider, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+            ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenComparatorFactories,
+            ITypeTraits[] invListsTypeTraits, IBinaryComparatorFactory[] invListComparatorFactories,
+            IBinaryTokenizerFactory tokenizerFactory, IIndexDataflowHelperFactory dataflowHelperFactory,
+            IModificationOperationCallbackFactory modificationOpCallbackFactory) {
+        super(spec, 1, 1, null, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
+                tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
+                dataflowHelperFactory, null, false, NoOpLocalResourceFactoryProvider.INSTANCE,
+                NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackFactory);
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new LSMIndexCompactOperatorNodePushable(this, ctx, partition);
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java
index 84c7150..c018f16 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexDataflowHelperFactory.java
@@ -15,12 +15,14 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
 
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -30,11 +32,11 @@
     private static final long serialVersionUID = 1L;
 
     public LSMInvertedIndexDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
-            ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerProvider,
-            ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
-            double bloomFilterFalsePositiveRate) {
-        super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerProvider, ioSchedulerProvider,
-                ioOpCallbackFactory, bloomFilterFalsePositiveRate);
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+            ILSMOperationTrackerProvider opTrackerProvider, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+            ILSMIOOperationCallbackFactory ioOpCallbackFactory, double bloomFilterFalsePositiveRate) {
+        super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerProvider,
+                ioSchedulerProvider, ioOpCallbackFactory, bloomFilterFalsePositiveRate);
     }
 
     @Override
@@ -42,8 +44,8 @@
             int partition) {
         return new LSMInvertedIndexDataflowHelper(opDesc, ctx, partition,
                 virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
-                mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
-                ioOpCallbackFactory);
+                mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+                ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory);
     }
 
 }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
index d78ae7e..aef0863 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
@@ -15,12 +15,14 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
 
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -30,11 +32,11 @@
     private static final long serialVersionUID = 1L;
 
     public PartitionedLSMInvertedIndexDataflowHelperFactory(IVirtualBufferCacheProvider virtualBufferCacheProvider,
-            ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerProvider opTrackerProvider,
-            ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
-            double bloomFilterFalsePositiveRate) {
-        super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerProvider, ioSchedulerProvider,
-                ioOpCallbackFactory, bloomFilterFalsePositiveRate);
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+            ILSMOperationTrackerProvider opTrackerProvider, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+            ILSMIOOperationCallbackFactory ioOpCallbackFactory, double bloomFilterFalsePositiveRate) {
+        super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerProvider,
+                ioSchedulerProvider, ioOpCallbackFactory, bloomFilterFalsePositiveRate);
     }
 
     @Override
@@ -42,7 +44,7 @@
             int partition) {
         return new PartitionedLSMInvertedIndexDataflowHelper(opDesc, ctx, partition,
                 virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
-                mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
-                ioOpCallbackFactory);
+                mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+                ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory);
     }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index cef6aee..0600754 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -267,10 +267,10 @@
     public void getOperationalComponents(ILSMIndexOperationContext ctx) {
         List<ILSMComponent> immutableComponents = diskComponents;
         List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
-        operationalComponents.clear();
         int cmc = currentMutableComponentId.get();
         ctx.setCurrentMutableComponentId(cmc);
         int numMutableComponents = memoryComponents.size();
+        operationalComponents.clear();
         switch (ctx.getOperation()) {
             case FLUSH:
             case DELETE:
@@ -291,8 +291,10 @@
                 operationalComponents.addAll(immutableComponents);
                 break;
             case MERGE:
-                operationalComponents.addAll(immutableComponents);
+                operationalComponents.addAll(ctx.getComponentsToBeMerged());
                 break;
+            case FULL_MERGE:
+                operationalComponents.addAll(immutableComponents);
             default:
                 throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
         }
@@ -561,6 +563,31 @@
                 mergeOp.getBloomFilterMergeTarget(), true);
 
         IInvertedIndex mergedDiskInvertedIndex = component.getInvIndex();
+
+        // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the inverted indexes so that
+        // lsmHarness.endSearch() is called once when the inverted indexes have been merged.
+        if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents
+                .get(diskComponents.size() - 1)) {
+            // Keep the deleted tuples since the oldest disk component is not included in the merge operation
+
+            LSMInvertedIndexDeletedKeysBTreeMergeCursor btreeCursor = new LSMInvertedIndexDeletedKeysBTreeMergeCursor(
+                    opCtx);
+            search(opCtx, btreeCursor, mergePred);
+
+            BTree btree = component.getDeletedKeysBTree();
+            IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
+            try {
+                while (btreeCursor.hasNext()) {
+                    btreeCursor.next();
+                    ITupleReference tuple = btreeCursor.getTuple();
+                    btreeBulkLoader.add(tuple);
+                }
+            } finally {
+                btreeCursor.close();
+            }
+            btreeBulkLoader.end();
+        }
+
         IIndexBulkLoader invIndexBulkLoader = mergedDiskInvertedIndex.createBulkLoader(1.0f, true, 0L, false);
         try {
             while (cursor.hasNext()) {
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index e31af9a..7e34dfe 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -15,12 +15,15 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
 
+import java.util.List;
+
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -84,12 +87,21 @@
     }
 
     @Override
-    public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+    public void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMComponent> components)
+            throws HyracksDataException, IndexException {
         ctx.setOperation(IndexOperation.MERGE);
+        ctx.getComponentsToBeMerged().clear();
+        ctx.getComponentsToBeMerged().addAll(components);
         lsmHarness.scheduleMerge(ctx, callback);
     }
 
     @Override
+    public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+        ctx.setOperation(IndexOperation.FULL_MERGE);
+        lsmHarness.scheduleFullMerge(ctx, callback);
+    }
+
+    @Override
     public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException {
         lsmHarness.merge(ctx, operation);
     }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
new file mode 100644
index 0000000..3efaaba
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
+
+public class LSMInvertedIndexDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
+
+    public LSMInvertedIndexDeletedKeysBTreeMergeCursor(ILSMIndexOperationContext opCtx) {
+        super(opCtx, true);
+    }
+
+    @Override
+    protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException, IndexException {
+        return false;
+    }
+
+    @Override
+    public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException,
+            IndexException {
+        LSMInvertedIndexRangeSearchCursorInitialState lsmInitialState = (LSMInvertedIndexRangeSearchCursorInitialState) initialState;
+        cmp = lsmInitialState.getOriginalKeyComparator();
+        operationalComponents = lsmInitialState.getOperationalComponents();
+        // We intentionally set the lsmHarness to null so that we don't call lsmHarness.endSearch() because we already do that when we merge the inverted indexes.
+        lsmHarness = null;
+        int numBTrees = operationalComponents.size();
+        rangeCursors = new IIndexCursor[numBTrees];
+
+        MultiComparator keyCmp = lsmInitialState.getKeyComparator();
+        RangePredicate btreePredicate = new RangePredicate(null, null, true, true, keyCmp, keyCmp);
+        ArrayList<IIndexAccessor> btreeAccessors = lsmInitialState.getDeletedKeysBTreeAccessors();
+        for (int i = 0; i < numBTrees; i++) {
+            rangeCursors[i] = btreeAccessors.get(i).createSearchCursor();
+            btreeAccessors.get(i).search(rangeCursors[i], btreePredicate);
+        }
+        setPriorityQueueComparator();
+        initPriorityQueue();
+    }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
index 323edd1..446e807 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
@@ -19,6 +19,7 @@
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractDiskLSMComponent;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
 
 public class LSMInvertedIndexDiskComponent extends AbstractDiskLSMComponent {
 
@@ -53,4 +54,12 @@
     public BloomFilter getBloomFilter() {
         return bloomFilter;
     }
+
+    @Override
+    public long getComponentSize() {
+        return ((OnDiskInvertedIndex) invIndex).getInvListsFile().getFile().length()
+                + ((OnDiskInvertedIndex) invIndex).getBTree().getFileReference().getFile().length()
+                + deletedKeysBTree.getFileReference().getFile().length()
+                + bloomFilter.getFileReference().getFile().length();
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
index 1a9303f..671e3f8 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
@@ -35,7 +35,8 @@
 
     private IndexOperation op;
     private final List<ILSMComponent> componentHolder;
-
+    private final List<ILSMComponent> componentsToBeMerged;
+    
     public final IModificationOperationCallback modificationCallback;
     public final ISearchOperationCallback searchCallback;
 
@@ -54,6 +55,7 @@
             IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback)
             throws HyracksDataException {
         this.componentHolder = new LinkedList<ILSMComponent>();
+        this.componentsToBeMerged = new LinkedList<ILSMComponent>();
         this.modificationCallback = modificationCallback;
         this.searchCallback = searchCallback;
 
@@ -84,6 +86,7 @@
     @Override
     public void reset() {
         componentHolder.clear();
+        componentsToBeMerged.clear();
     }
 
     @Override
@@ -118,4 +121,9 @@
         currentMutableInvIndexAccessors = mutableInvIndexAccessors[currentMutableComponentId];
         currentDeletedKeysBTreeAccessors = deletedKeysBTreeAccessors[currentMutableComponentId];
     }
+    
+    @Override
+    public List<ILSMComponent> getComponentsToBeMerged() {
+        return componentsToBeMerged;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
index cd0dde3..a6ff07e 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
@@ -28,8 +28,8 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BloomFilterAwareBTreePointSearchCursor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
@@ -43,7 +43,7 @@
     protected RangePredicate keySearchPred;
 
     public LSMInvertedIndexRangeSearchCursor(ILSMIndexOperationContext opCtx) {
-        super(opCtx);
+        super(opCtx, false);
     }
 
     @Override
@@ -76,12 +76,12 @@
             for (int i = 0; i < operationalComponents.size(); i++) {
                 ILSMComponent component = operationalComponents.get(i);
                 if (component.getType() == LSMComponentType.MEMORY) {
-                 // No need for a bloom filter for the in-memory BTree.
+                    // No need for a bloom filter for the in-memory BTree.
                     deletedKeysBTreeCursors[i] = deletedKeysBTreeAccessors.get(i).createSearchCursor();
                 } else {
-                    deletedKeysBTreeCursors[i] = new BloomFilterAwareBTreePointSearchCursor((IBTreeLeafFrame) lsmInitState
-                            .getgetDeletedKeysBTreeLeafFrameFactory().createFrame(), false,
-                            ((LSMInvertedIndexDiskComponent) operationalComponents.get(i)).getBloomFilter());
+                    deletedKeysBTreeCursors[i] = new BloomFilterAwareBTreePointSearchCursor(
+                            (IBTreeLeafFrame) lsmInitState.getgetDeletedKeysBTreeLeafFrameFactory().createFrame(),
+                            false, ((LSMInvertedIndexDiskComponent) operationalComponents.get(i)).getBloomFilter());
                 }
             }
         }
@@ -114,50 +114,4 @@
         }
         return false;
     }
-
-    @Override
-    protected void checkPriorityQueue() throws HyracksDataException, IndexException {
-        while (!outputPriorityQueue.isEmpty() || needPush == true) {
-            if (!outputPriorityQueue.isEmpty()) {
-                PriorityQueueElement checkElement = outputPriorityQueue.peek();
-                // If there is no previous tuple or the previous tuple can be ignored
-                if (outputElement == null) {
-                    if (isDeleted(checkElement)) {
-                        // If the key has been deleted then pop it and set needPush to true.
-                        // We cannot push immediately because the tuple may be
-                        // modified if hasNext() is called
-                        outputElement = outputPriorityQueue.poll();
-                        needPush = true;
-                    } else {
-                        break;
-                    }
-                } else {
-                    // Compare the previous tuple and the head tuple in the PQ
-                    if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
-                        // If the previous tuple and the head tuple are
-                        // identical
-                        // then pop the head tuple and push the next tuple from
-                        // the tree of head tuple
-
-                        // the head element of PQ is useless now
-                        PriorityQueueElement e = outputPriorityQueue.poll();
-                        pushIntoPriorityQueue(e);
-                    } else {
-                        // If the previous tuple and the head tuple are different
-                        // the info of previous tuple is useless
-                        if (needPush == true) {
-                            pushIntoPriorityQueue(outputElement);
-                            needPush = false;
-                        }
-                        outputElement = null;
-                    }
-                }
-            } else {
-                // the priority queue is empty and needPush
-                pushIntoPriorityQueue(outputElement);
-                needPush = false;
-                outputElement = null;
-            }
-        }
-    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 0ab2f4d..67d1796 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -462,6 +462,10 @@
     public BTree getBTree() {
         return btree;
     }
+    
+    public FileReference getInvListsFile() {
+        return invListsFile;
+    }
 
     public class OnDiskInvertedIndexAccessor implements IInvertedIndexAccessor {
         private final OnDiskInvertedIndex index;
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
index 8254689..7b54f19 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeDataflowHelperFactory.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow;
 
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -23,7 +25,7 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
@@ -40,12 +42,12 @@
 
     public LSMRTreeDataflowHelperFactory(IPrimitiveValueProviderFactory[] valueProviderFactories,
             RTreePolicyType rtreePolicyType, IBinaryComparatorFactory[] btreeComparatorFactories,
-            IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyProvider mergePolicyProvider,
-            ILSMOperationTrackerProvider opTrackerFactory, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
-            ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILinearizeComparatorFactory linearizeCmpFactory,
-            double bloomFilterFalsePositiveRate) {
-        super(virtualBufferCacheProvider, mergePolicyProvider, opTrackerFactory, ioSchedulerProvider,
-                ioOpCallbackFactory, bloomFilterFalsePositiveRate);
+            IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, ILSMOperationTrackerProvider opTrackerFactory,
+            ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
+            ILinearizeComparatorFactory linearizeCmpFactory, double bloomFilterFalsePositiveRate) {
+        super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerFactory,
+                ioSchedulerProvider, ioOpCallbackFactory, bloomFilterFalsePositiveRate);
         this.btreeComparatorFactories = btreeComparatorFactories;
         this.valueProviderFactories = valueProviderFactories;
         this.rtreePolicyType = rtreePolicyType;
@@ -58,7 +60,7 @@
         return new LSMRTreeDataflowHelper(opDesc, ctx, partition,
                 virtualBufferCacheProvider.getVirtualBufferCaches(ctx), bloomFilterFalsePositiveRate,
                 btreeComparatorFactories, valueProviderFactories, rtreePolicyType,
-                mergePolicyProvider.getMergePolicy(ctx), opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx),
-                ioOpCallbackFactory, linearizeCmpFactory);
+                mergePolicyFactory.createMergePolicy(mergePolicyProperties), opTrackerFactory,
+                ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory, linearizeCmpFactory);
     }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
index ef34876..2e1cfaa 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterTuplesDataflowHelperFactory.java
@@ -15,47 +15,42 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow;
 
+import java.util.Map;
+
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
 
-public class LSMRTreeWithAntiMatterTuplesDataflowHelperFactory implements IIndexDataflowHelperFactory {
+public class LSMRTreeWithAntiMatterTuplesDataflowHelperFactory extends AbstractLSMIndexDataflowHelperFactory {
 
     private static final long serialVersionUID = 1L;
 
-    private final IVirtualBufferCacheProvider virtualBufferCacheProvider;
     private final IBinaryComparatorFactory[] btreeComparatorFactories;
     private final IPrimitiveValueProviderFactory[] valueProviderFactories;
     private final RTreePolicyType rtreePolicyType;
-    private final ILSMMergePolicyProvider mergePolicyProvider;
-    private final ILSMOperationTrackerProvider opTrackerProvider;
-    private final ILSMIOOperationSchedulerProvider ioSchedulerProvider;
-    private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
     private final ILinearizeComparatorFactory linearizeCmpFactory;
 
     public LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(IPrimitiveValueProviderFactory[] valueProviderFactories,
             RTreePolicyType rtreePolicyType, IBinaryComparatorFactory[] btreeComparatorFactories,
-            IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyProvider mergePolicyProvider,
-            ILSMOperationTrackerProvider opTrackerProvider, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
-            ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILinearizeComparatorFactory linearizeCmpFactory) {
-        this.virtualBufferCacheProvider = virtualBufferCacheProvider;
+            IVirtualBufferCacheProvider virtualBufferCacheProvider, ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, ILSMOperationTrackerProvider opTrackerFactory,
+            ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
+            ILinearizeComparatorFactory linearizeCmpFactory) {
+        super(virtualBufferCacheProvider, mergePolicyFactory, mergePolicyProperties, opTrackerFactory, ioSchedulerProvider,
+                ioOpCallbackFactory, 1.0);
         this.btreeComparatorFactories = btreeComparatorFactories;
         this.valueProviderFactories = valueProviderFactories;
         this.rtreePolicyType = rtreePolicyType;
-        this.mergePolicyProvider = mergePolicyProvider;
-        this.ioSchedulerProvider = ioSchedulerProvider;
-        this.opTrackerProvider = opTrackerProvider;
-        this.ioOpCallbackFactory = ioOpCallbackFactory;
         this.linearizeCmpFactory = linearizeCmpFactory;
     }
 
@@ -64,7 +59,7 @@
             int partition) {
         return new LSMRTreeWithAntiMatterTuplesDataflowHelper(opDesc, ctx, partition,
                 virtualBufferCacheProvider.getVirtualBufferCaches(ctx), btreeComparatorFactories,
-                valueProviderFactories, rtreePolicyType, mergePolicyProvider.getMergePolicy(ctx), opTrackerProvider,
-                ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory, linearizeCmpFactory);
+                valueProviderFactories, rtreePolicyType, mergePolicyFactory.createMergePolicy(mergePolicyProperties),
+                opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx), ioOpCallbackFactory, linearizeCmpFactory);
     }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index b09e115..673c7ae 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -24,18 +24,15 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
-import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -200,11 +197,11 @@
     @Override
     public void getOperationalComponents(ILSMIndexOperationContext ctx) {
         List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
-        operationalComponents.clear();
         List<ILSMComponent> immutableComponents = diskComponents;
         int cmc = currentMutableComponentId.get();
         ctx.setCurrentMutableComponentId(cmc);
         int numMutableComponents = memoryComponents.size();
+        operationalComponents.clear();
         switch (ctx.getOperation()) {
             case INSERT:
             case DELETE:
@@ -225,8 +222,10 @@
                 operationalComponents.addAll(immutableComponents);
                 break;
             case MERGE:
-                operationalComponents.addAll(immutableComponents);
+                operationalComponents.addAll(ctx.getComponentsToBeMerged());
                 break;
+            case FULL_MERGE:
+                operationalComponents.addAll(immutableComponents);
             default:
                 throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
         }
@@ -332,38 +331,11 @@
         ctx.modificationCallback.before(tuple);
         ctx.modificationCallback.found(null, tuple);
         if (ctx.getOperation() == IndexOperation.INSERT) {
-            // Before each insert, we must check whether there exist a killer
-            // tuple in the memBTree. If we find a killer tuple, we must truly
-            // delete the existing tuple from the BTree, and then insert it to
-            // memRTree. Otherwise, the old killer tuple will kill the newly
-            // added RTree tuple.
-            RangePredicate btreeRangePredicate = new RangePredicate(tuple, tuple, true, true,
-                    ctx.getBTreeMultiComparator(), ctx.getBTreeMultiComparator());
-            ITreeIndexCursor cursor = ctx.currentMutableBTreeAccessor.createSearchCursor();
-            ctx.currentMutableBTreeAccessor.search(cursor, btreeRangePredicate);
-            boolean foundTupleInMemoryBTree = false;
-            try {
-                if (cursor.hasNext()) {
-                    foundTupleInMemoryBTree = true;
-                }
-            } finally {
-                cursor.close();
-            }
-            if (foundTupleInMemoryBTree) {
-                try {
-                    ctx.currentMutableBTreeAccessor.delete(tuple);
-                } catch (TreeIndexNonExistentKeyException e) {
-                    // Tuple has been deleted in the meantime. Do nothing.
-                    // This normally shouldn't happen if we are dealing with
-                    // good citizens since LSMRTree is used as a secondary
-                    // index and a tuple shouldn't be deleted twice without
-                    // insert between them.
-                }
-            } else {
-                ctx.currentMutableRTreeAccessor.insert(tuple);
-            }
-
+            ctx.currentMutableRTreeAccessor.insert(tuple);
         } else {
+            // First remove all entries in the in-memory rtree (if any).
+            ctx.currentMutableRTreeAccessor.delete(tuple);
+            // Insert key into the deleted-keys BTree.
             try {
                 ctx.currentMutableBTreeAccessor.insert(tuple);
             } catch (TreeIndexDuplicateKeyException e) {
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index a4b6139..662aa02 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -309,8 +309,31 @@
 
         LSMRTreeDiskComponent mergedComponent = createDiskComponent(componentFactory, mergeOp.getRTreeMergeTarget(),
                 mergeOp.getBTreeMergeTarget(), mergeOp.getBloomFilterMergeTarget(), true);
-        IIndexBulkLoader bulkLoader = mergedComponent.getRTree().createBulkLoader(1.0f, false, 0L, false);
 
+        // In case we must keep the deleted-keys BTrees, then they must be merged *before* merging the r-trees so that
+        // lsmHarness.endSearch() is called once when the r-trees have been merged.
+        if (mergeOp.getMergingComponents().get(mergeOp.getMergingComponents().size() - 1) != diskComponents
+                .get(diskComponents.size() - 1)) {
+            // Keep the deleted tuples since the oldest disk component is not included in the merge operation
+
+            LSMRTreeDeletedKeysBTreeMergeCursor btreeCursor = new LSMRTreeDeletedKeysBTreeMergeCursor(opCtx);
+            search(opCtx, btreeCursor, rtreeSearchPred);
+
+            BTree btree = mergedComponent.getBTree();
+            IIndexBulkLoader btreeBulkLoader = btree.createBulkLoader(1.0f, true, 0L, false);
+            try {
+                while (btreeCursor.hasNext()) {
+                    btreeCursor.next();
+                    ITupleReference tuple = btreeCursor.getTuple();
+                    btreeBulkLoader.add(tuple);
+                }
+            } finally {
+                btreeCursor.close();
+            }
+            btreeBulkLoader.end();
+        }
+
+        IIndexBulkLoader bulkLoader = mergedComponent.getRTree().createBulkLoader(1.0f, false, 0L, false);
         try {
             while (cursor.hasNext()) {
                 cursor.next();
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
new file mode 100644
index 0000000..fe2ed96
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
+
+public class LSMRTreeDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
+
+    public LSMRTreeDeletedKeysBTreeMergeCursor(ILSMIndexOperationContext opCtx) {
+        super(opCtx, true);
+    }
+
+    @Override
+    protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException, IndexException {
+        return false;
+    }
+
+    @Override
+    public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException,
+            IndexException {
+        LSMRTreeCursorInitialState lsmInitialState = (LSMRTreeCursorInitialState) initialState;
+        cmp = lsmInitialState.getBTreeCmp();
+        operationalComponents = lsmInitialState.getOperationalComponents();
+        // We intentionally set the lsmHarness to null so that we don't call lsmHarness.endSearch() because we already do that when we merge r-trees.
+        lsmHarness = null;
+        int numBTrees = operationalComponents.size();
+        rangeCursors = new IIndexCursor[numBTrees];
+
+        RangePredicate btreePredicate = new RangePredicate(null, null, true, true, cmp, cmp);
+        IIndexAccessor[] btreeAccessors = new ITreeIndexAccessor[numBTrees];
+        for (int i = 0; i < numBTrees; i++) {
+            ILSMComponent component = operationalComponents.get(i);
+            IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) lsmInitialState.getBTreeLeafFrameFactory().createFrame();
+            rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
+            BTree btree = (BTree) ((LSMRTreeDiskComponent) component).getBTree();
+            btreeAccessors[i] = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            btreeAccessors[i].search(rangeCursors[i], btreePredicate);
+        }
+        setPriorityQueueComparator();
+        initPriorityQueue();
+    }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
index 7bc3f79..c3c0a55 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
@@ -54,4 +54,14 @@
     public BloomFilter getBloomFilter() {
         return bloomFilter;
     }
+
+    @Override
+    public long getComponentSize() {
+        long size = rtree.getFileReference().getFile().length();
+        if (btree != null) {
+            size += btree.getFileReference().getFile().length();
+            size += bloomFilter.getFileReference().getFile().length();
+        }
+        return size;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
index b94feba..132e55b 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
@@ -48,6 +48,7 @@
 
     private IndexOperation op;
     public final List<ILSMComponent> componentHolder;
+    private final List<ILSMComponent> componentsToBeMerged;
     public final IModificationOperationCallback modificationCallback;
     public final ISearchOperationCallback searchCallback;
 
@@ -76,6 +77,7 @@
         currentRTreeOpContext = rtreeOpContexts[0];
         currentBTreeOpContext = btreeOpContexts[0];
         this.componentHolder = new LinkedList<ILSMComponent>();
+        this.componentsToBeMerged = new LinkedList<ILSMComponent>();
         this.modificationCallback = modificationCallback;
         this.searchCallback = searchCallback;
     }
@@ -101,6 +103,7 @@
     @Override
     public void reset() {
         componentHolder.clear();
+        componentsToBeMerged.clear();
     }
 
     @Override
@@ -126,4 +129,9 @@
     public IModificationOperationCallback getModificationCallback() {
         return modificationCallback;
     }
+    
+    @Override
+    public List<ILSMComponent> getComponentsToBeMerged() {
+        return componentsToBeMerged;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
index 30dd467..f669585 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSearchCursor.java
@@ -79,7 +79,7 @@
                 ITupleReference currentTuple = rtreeCursors[currentCursor].getTuple();
 
                 boolean killerTupleFound = false;
-                for (int i = 0; i <= currentCursor; i++) {
+                for (int i = 0; i < currentCursor; i++) {
                     btreeCursors[i].reset();
                     btreeRangePredicate.setHighKey(currentTuple, true);
                     btreeRangePredicate.setLowKey(currentTuple, true);
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
index dd31165..2e6fc78 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
@@ -25,6 +25,8 @@
 
 public class LSMRTreeSortedCursor extends LSMRTreeAbstractCursor {
 
+    // TODO: This class can be removed and instead use a search cursor that uses a logic similar 
+    // to the one in LSMRTreeWithAntiMatterTuplesSearchCursor
     private ILinearizeComparator linearizeCmp;
     private boolean[] depletedRtreeCursors;
     private int foundIn = -1;
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 812e942..ec66a39a 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -40,7 +40,6 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
@@ -76,9 +75,9 @@
             ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
             ILSMIOOperationCallback ioOpCallback) {
         super(virtualBufferCaches, rtreeInteriorFrameFactory, rtreeLeafFrameFactory, btreeInteriorFrameFactory,
-                btreeLeafFrameFactory, fileManager, new LSMRTreeWithAntiMatterTuplesDiskComponentFactory(diskRTreeFactory),
-                diskFileMapProvider, fieldCount, rtreeCmpFactories, btreeCmpFactories, linearizer, comparatorFields,
-                linearizerArray, 0, mergePolicy, opTracker, ioScheduler, ioOpCallback);
+                btreeLeafFrameFactory, fileManager, new LSMRTreeWithAntiMatterTuplesDiskComponentFactory(
+                        diskRTreeFactory), diskFileMapProvider, fieldCount, rtreeCmpFactories, btreeCmpFactories,
+                linearizer, comparatorFields, linearizerArray, 0, mergePolicy, opTracker, ioScheduler, ioOpCallback);
         bulkLoaComponentFactory = new LSMRTreeWithAntiMatterTuplesDiskComponentFactory(bulkLoadRTreeFactory);
         this.bTreeTupleSorter = null;
     }
@@ -248,7 +247,12 @@
         LSMRTreeOpContext rctx = createOpContext(NoOpOperationCallback.INSTANCE);
         rctx.setOperation(IndexOperation.MERGE);
         List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
-        ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(rctx);
+        boolean returnDeletedTuples = false;
+        if (ctx.getComponentHolder().get(ctx.getComponentHolder().size() - 1) != diskComponents.get(diskComponents
+                .size() - 1)) {
+            returnDeletedTuples = true;
+        }
+        ITreeIndexCursor cursor = new LSMRTreeWithAntiMatterTuplesSearchCursor(rctx, returnDeletedTuples);
         LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents);
         ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, rctx);
         ioScheduler.scheduleOperation(new LSMRTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
index cbaf3b3..7099d7d 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
@@ -54,7 +54,11 @@
     private int numMutableComponents;
 
     public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx) {
-        super(opCtx);
+        this(opCtx, false);
+    }
+
+    public LSMRTreeWithAntiMatterTuplesSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
+        super(opCtx, returnDeletedTuples);
         currentCursor = 0;
     }
 
@@ -152,7 +156,7 @@
             while (super.hasNext()) {
                 super.next();
                 ITupleReference diskRTreeTuple = super.getTuple();
-                if (searchMemBTrees(diskRTreeTuple, numMutableComponents - 1)) {
+                if (searchMemBTrees(diskRTreeTuple, numMutableComponents)) {
                     foundNext = true;
                     frameTuple = diskRTreeTuple;
                     return true;
@@ -216,7 +220,7 @@
 
     private boolean searchMemBTrees(ITupleReference tuple, int lastBTreeToSearch) throws HyracksDataException,
             IndexException {
-        for (int i = 0; i <= lastBTreeToSearch; i++) {
+        for (int i = 0; i < lastBTreeToSearch; i++) {
             btreeCursors[i].reset();
             btreeRangePredicate.setHighKey(tuple, true);
             btreeRangePredicate.setLowKey(tuple, true);
@@ -261,50 +265,4 @@
             }
         }
     }
-
-    @Override
-    protected void checkPriorityQueue() throws HyracksDataException, IndexException {
-        while (!outputPriorityQueue.isEmpty() || needPush == true) {
-            if (!outputPriorityQueue.isEmpty()) {
-                PriorityQueueElement checkElement = outputPriorityQueue.peek();
-                // If there is no previous tuple or the previous tuple can be ignored
-                if (outputElement == null) {
-                    if (isDeleted(checkElement)) {
-                        // If the key has been deleted then pop it and set needPush to true.
-                        // We cannot push immediately because the tuple may be
-                        // modified if hasNext() is called
-                        outputElement = outputPriorityQueue.poll();
-                        needPush = true;
-                    } else {
-                        break;
-                    }
-                } else {
-                    // Compare the previous tuple and the head tuple in the PQ
-                    if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
-                        // If the previous tuple and the head tuple are
-                        // identical
-                        // then pop the head tuple and push the next tuple from
-                        // the tree of head tuple
-
-                        // the head element of PQ is useless now
-                        PriorityQueueElement e = outputPriorityQueue.poll();
-                        pushIntoPriorityQueue(e);
-                    } else {
-                        // If the previous tuple and the head tuple are different
-                        // the info of previous tuple is useless
-                        if (needPush == true) {
-                            pushIntoPriorityQueue(outputElement);
-                            needPush = false;
-                        }
-                        outputElement = null;
-                    }
-                }
-            } else {
-                // the priority queue is empty and needPush
-                pushIntoPriorityQueue(outputElement);
-                needPush = false;
-                outputElement = null;
-            }
-        }
-    }
 }
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 8090564..2992dfe 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -119,7 +119,8 @@
 
     @Override
     public ICachedPage tryPin(long dpid) throws HyracksDataException {
-        pinSanityCheck(dpid);
+        // Calling the pinSanityCheck should be used only for debugging, since the synchronized block over the fileInfoMap is a hot spot.
+        //pinSanityCheck(dpid);
         CachedPage cPage = null;
         int hash = hash(dpid);
         CacheBucket bucket = pageMap[hash];
@@ -142,7 +143,8 @@
 
     @Override
     public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
-        pinSanityCheck(dpid);
+        // Calling the pinSanityCheck should be used only for debugging, since the synchronized block over the fileInfoMap is a hot spot.
+        //pinSanityCheck(dpid);
         CachedPage cPage = findPage(dpid, newPage);
         if (!newPage) {
             // Resolve race of multiple threads trying to read the page from
@@ -760,8 +762,8 @@
         BufferedFileHandle fInfo = null;
         synchronized (fileInfoMap) {
             fInfo = fileInfoMap.get(fileId);
-            ioManager.sync(fInfo.getFileHandle(), metadata);
         }
+        ioManager.sync(fInfo.getFileHandle(), metadata);
     }
 
     @Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
index f7aa7f4..9729d51 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeMergeTestDriver.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.storage.am.btree.OrderedIndexTestUtils;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
 import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 
@@ -70,7 +71,8 @@
             }
 
             ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
-            accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+            accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+                    ((LSMBTree) ctx.getIndex()).getImmutableComponents());
 
             orderedIndexTestUtils.checkPointSearches(ctx);
             orderedIndexTestUtils.checkScan(ctx);
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
index 4a96131..35ecc20 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/multithread/LSMBTreeTestWorker.java
@@ -106,7 +106,7 @@
                 break;
 
             case MERGE:
-                accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+                accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, lsmBTree.getImmutableComponents());
                 break;
 
             default:
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
index 5115b7e..3713498 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexMergeTest.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestUtils;
@@ -54,7 +55,8 @@
                 invIndex.activate();
             }
             // Perform merge.
-            invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+            invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+                    ((LSMInvertedIndex) invIndex).getImmutableComponents());
             validateAndCheckIndex(testCtx);
             runTinySearchWorkload(testCtx, tupleGen);
         }
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
index 523557d..53076fa 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestUtils;
@@ -54,7 +55,8 @@
                 invIndex.activate();
             }
             // Perform merge.
-            invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+            invIndexAccessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+                    ((LSMInvertedIndex) invIndex).getImmutableComponents());
             validateAndCheckIndex(testCtx);
             runTinySearchWorkload(testCtx, tupleGen);
         }
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
index e1570af..35afd09 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexTestWorker.java
@@ -109,7 +109,7 @@
             }
 
             case MERGE: {
-                accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+                accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, invIndex.getImmutableComponents());
                 break;
             }
 
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
index 18528c4..a410b2a 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.AbstractLSMRTree;
 import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestContext;
 import edu.uci.ics.hyracks.storage.am.rtree.AbstractRTreeTestDriver;
 import edu.uci.ics.hyracks.storage.am.rtree.RTreeTestUtils;
@@ -72,7 +73,8 @@
             }
 
             ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
-            accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+            accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+                    ((AbstractLSMRTree) ctx.getIndex()).getImmutableComponents());
 
             rTreeTestUtils.checkScan(ctx);
             rTreeTestUtils.checkDiskOrderScan(ctx);
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
index b3fddc8..c7a86d9 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeTestWorker.java
@@ -73,7 +73,7 @@
                 break;
 
             case MERGE:
-                accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+                accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, lsmRTree.getImmutableComponents());
                 break;
 
             default:
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
index 12d7742..383cbc4 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/multithread/LSMRTreeWithAntiMatterTuplesTestWorker.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.hyracks.storage.am.common.datagen.DataGenThread;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.AbstractLSMRTree;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeWithAntiMatterTuples.LSMRTreeWithAntiMatterTuplesAccessor;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
 
@@ -61,7 +62,8 @@
                 break;
 
             case MERGE:
-                accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+                accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE,
+                        ((AbstractLSMRTree) lsmRTree).getImmutableComponents());
                 break;
 
             default:
diff --git a/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java b/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
index 7e71aea..c67386e 100644
--- a/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
@@ -96,8 +96,9 @@
         }
 
         bufferCache.closeFile(fileId);
-
-        boolean exceptionThrown = false;
+        
+        // This code is commented because the method pinSanityCheck in the BufferCache is commented.
+        /*boolean exceptionThrown = false;
 
         // tryPin should fail since file is not open
         try {
@@ -114,7 +115,7 @@
         } catch (HyracksDataException e) {
             exceptionThrown = true;
         }
-        Assert.assertTrue(exceptionThrown);
+        Assert.assertTrue(exceptionThrown);*/
 
         // open file again
         bufferCache.openFile(fileId);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 4863378..a577bdb 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -25,7 +25,9 @@
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.logging.Logger;
 
@@ -74,7 +76,7 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpOperationTrackerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvider;
@@ -134,6 +136,12 @@
     protected int frameSize = ClusterConfig.getFrameSize();
     protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
 
+    private static final Map<String, String> MERGE_POLICY_PROPERTIES;
+    static {
+        MERGE_POLICY_PROPERTIES = new HashMap<String, String>();
+        MERGE_POLICY_PROPERTIES.put("num-components", "3");
+    }
+
     protected static final String SECONDARY_INDEX_ODD = "secondary1";
     protected static final String SECONDARY_INDEX_EVEN = "secondary2";
 
@@ -480,9 +488,9 @@
 
     protected IIndexDataflowHelperFactory getIndexDataflowHelperFactory() {
         if (BspUtils.useLSM(conf)) {
-            return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(), new ConstantMergePolicyProvider(
-                    3), NoOpOperationTrackerProvider.INSTANCE, SynchronousSchedulerProvider.INSTANCE,
-                    NoOpIOOperationCallback.INSTANCE, 0.01);
+            return new LSMBTreeDataflowHelperFactory(new VirtualBufferCacheProvider(),
+                    new ConstantMergePolicyFactory(), MERGE_POLICY_PROPERTIES, NoOpOperationTrackerProvider.INSTANCE,
+                    SynchronousSchedulerProvider.INSTANCE, NoOpIOOperationCallback.INSTANCE, 0.01);
         } else {
             return new BTreeDataflowHelperFactory();
         }
@@ -739,8 +747,7 @@
          * connect operator descriptors
          */
         ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
-                sort, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sort, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), sort, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);